In [1]:
from caseconverter import snakecase
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType, StructField, StructType

In [2]:
spark = (
    SparkSession.builder.appName("SampleEtlJob1")
    .config(
        "spark.jars.packages",
        "mysql:mysql-connector-java:8.0.26,org.apache.spark:spark-sql_2.12:3.1.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,com.datastax.spark:spark-cassandra-connector_2.12:3.1.0",
    )
    .master("spark://spark:7077")
    .getOrCreate()
)



:: loading settings :: url = jar:file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
mysql#mysql-connector-java added as a dependency
org.apache.spark#spark-sql_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5a16b295-8702-4250-bac8-17820d011e68;1.0
	confs: [default]
	found mysql#mysql-connector-java;8.0.26 in central
	found com.google.protobuf#protobuf-java;3.11.4 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-p

In [3]:
catalog_path: str = "/mounts/spark-share/Products1.txt"
catalog_df = spark.read.options(header=True, delimiter="|").csv(catalog_path)
catalog_df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+------------+--------------------+--------+--------------+--------+---------+
|Manufacturer|        Product Name|    Size|      itemType|     SKU|BasePrice|
+------------+--------------------+--------+--------------+--------+---------+
|   Zatarains|  Jambalaya Rice Mix|   12 oz| Rice/Rice Mix|42081001|    $2.49|
|   Zatarains|  Jambalaya Rice Mix|    8 oz| Rice/Rice Mix|42082001|    $1.79|
|     Yucatan|   Guacamole Regular|    8 oz|          null|42083001|    $3.99|
|       Yuban|Coffee Original B...|   12 oz|Coffee/Creamer|42084001|    $3.99|
|     Yoplait| GoGurt Variety Pack|    8 ct|        Yogurt|42085001|    $2.99|
|    Wishbone|    Italian Dressing|   16 oz|Salad Dressing|42086001|    $2.00|
|White Castle|Cheeseburger Heat...|29.28 oz|          null|42087001|   $11.59|
|     Whiskas| Choice Cuts Poultry|   36 oz|      Pet Food|42088001|    $4.99|
|      Welchs|Farmers Pick Conc...|   46 oz|          null|42089001|    $3.59|
|      Welchs|     Juice Red Grape|   64 oz|        

                                                                                

In [4]:
# replace missing itemType values with "OTHER"
catalog_df = catalog_df.fillna("OTHER", ["itemType"])

for col in catalog_df.columns:
    # capitalize all string fields
    catalog_df = catalog_df.withColumn(col, F.upper(catalog_df[col]))
    # convert column titles to snake case
    catalog_df = catalog_df.withColumnRenamed(col, snakecase(col))

catalog_df.show()

+------------+--------------------+--------+--------------+--------+----------+
|manufacturer|        product_name|    size|     item_type|     sku|base_price|
+------------+--------------------+--------+--------------+--------+----------+
|   ZATARAINS|  JAMBALAYA RICE MIX|   12 OZ| RICE/RICE MIX|42081001|     $2.49|
|   ZATARAINS|  JAMBALAYA RICE MIX|    8 OZ| RICE/RICE MIX|42082001|     $1.79|
|     YUCATAN|   GUACAMOLE REGULAR|    8 OZ|         OTHER|42083001|     $3.99|
|       YUBAN|COFFEE ORIGINAL B...|   12 OZ|COFFEE/CREAMER|42084001|     $3.99|
|     YOPLAIT| GOGURT VARIETY PACK|    8 CT|        YOGURT|42085001|     $2.99|
|    WISHBONE|    ITALIAN DRESSING|   16 OZ|SALAD DRESSING|42086001|     $2.00|
|WHITE CASTLE|CHEESEBURGER HEAT...|29.28 OZ|         OTHER|42087001|    $11.59|
|     WHISKAS| CHOICE CUTS POULTRY|   36 OZ|      PET FOOD|42088001|     $4.99|
|      WELCHS|FARMERS PICK CONC...|   46 OZ|         OTHER|42089001|     $3.59|
|      WELCHS|     JUICE RED GRAPE|   64

In [5]:
transactions_db: str = "backup"
transactions_table: str = "transactions_subset"
transactions_url: str = "jdbc:mysql://mariadb:3306/{db}".format(db=transactions_db)
transactions_query: str = "SELECT * FROM {table}".format(table=transactions_table)

transactions_df = (
    spark.read.format("jdbc")
    .option("url", transactions_url)
    .option("driver", "com.mysql.cj.jdbc.Driver")
    .option(
        "query",
        transactions_query,
    )
    .option("user", "root")
    .load()
)

transactions_df.show()

[Stage 3:>                                                          (0 + 1) / 1]

+-------+--------------+-----------+--------+----------+----------------+
|     id|transaction_id|customer_id|     sku|sale_price|transaction_date|
+-------+--------------+-----------+--------+----------+----------------+
|5047396|        108471|       4631|42358001|      2.02|      2020-04-05|
|5047397|        108471|       4631|44123001|      3.73|      2020-04-05|
|5047398|        108471|       4631|43128001|      3.73|      2020-04-05|
|5047399|        108471|       4631|42235001|      3.21|      2020-04-05|
|5047400|        108471|       4631|43577001|      3.34|      2020-04-05|
|5047401|        108471|       4631|44089001|      2.77|      2020-04-05|
|5047402|        108471|       4631|43496001|      4.80|      2020-04-05|
|5047403|        108471|       4631|43968001|      1.38|      2020-04-05|
|5047404|        108471|       4631|43491001|      3.95|      2020-04-05|
|5047405|        108471|       4631|42535001|      4.27|      2020-04-05|
|5047406|        108471|       4631|43

                                                                                

In [6]:
joined_df = (
    transactions_df.alias("t")
    .join(catalog_df.alias("c"), transactions_df.sku == catalog_df.sku)
    .select("t.*", "c.manufacturer", "c.product_name", "c.size")
)
joined_df.show()

[Stage 5:>                                                          (0 + 1) / 1]

+-------+--------------+-----------+--------+----------+----------------+---------------+--------------------+--------+
|     id|transaction_id|customer_id|     sku|sale_price|transaction_date|   manufacturer|        product_name|    size|
+-------+--------------+-----------+--------+----------+----------------+---------------+--------------------+--------+
|5047396|        108471|       4631|42358001|      2.02|      2020-04-05|    ROWAN DAIRY|          2.00% MILK| 1/2 GAL|
|5047397|        108471|       4631|44123001|      3.73|      2020-04-05|         ARNOLD|SANDWICH THINS MU...|   12 OZ|
|5047398|        108471|       4631|43128001|      3.73|      2020-04-05|        HERSHEY|     SYRUP CHOCOLATE|   48 OZ|
|5047399|        108471|       4631|42235001|      3.21|      2020-04-05|      TASTYKAKE|      CUPCAKES LEMON|12.75 OZ|
|5047400|        108471|       4631|43577001|      3.34|      2020-04-05|    EVEN BETTER|EVEN BETTER CHEES...|     6.9|
|5047401|        108471|       4631|4408

                                                                                

In [7]:
joined_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- transaction_id: decimal(20,0) (nullable = true)
 |-- customer_id: decimal(20,0) (nullable = true)
 |-- sku: long (nullable = true)
 |-- sale_price: decimal(38,2) (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- size: string (nullable = true)



In [8]:
KEYSPACE = "demo_keyspace"
TABLE = "demo_table"

auth_provider = PlainTextAuthProvider(username="cassandra", password="cassandra")
cluster = Cluster(["cassandra"], port=9042, auth_provider=auth_provider)
session = cluster.connect()

rs = session.execute(
    """
    CREATE KEYSPACE IF NOT EXISTS %s
    WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' }
    """
    % KEYSPACE
)
session.set_keyspace(KEYSPACE)

In [9]:
session.execute("DROP TABLE IF EXISTS {table}".format(table=TABLE))
rs = session.execute(
    """
    CREATE TABLE IF NOT EXISTS %s (
        id int,
        transaction_id bigint,
        customer_id bigint,
        sku bigint,
        sale_price decimal,
        transaction_date date,
        manufacturer text,
        product_name text,
        size text,
        PRIMARY KEY (id)
    )
    """
    % TABLE
)

In [10]:
joined_df.write.format(
    "org.apache.spark.sql.cassandra"
).mode("overwrite").option(
    "confirm.truncate", "true"
).option(
    "spark.cassandra.connection.host", "cassandra"
).option(
    "spark.cassandra.connection.port", "9042"
).option(
    "spark.cassandra.auth.username", "cassandra"
).option(
    "spark.cassandra.auth.password", "cassandra"
).option(
    "keyspace", KEYSPACE
).option(
    "table", TABLE
).save()

[Stage 7:>                                                          (0 + 1) / 1]

KeyboardInterrupt: 

In [None]:
data_store_struct = StructType(
    [
        StructField("data_store_type", StringType(), False),
        StructField("url", StringType(), False),
        StructField("database", StringType(), True),
        StructField("table", StringType(), True),
        StructField("query", StringType(), True),
    ]
)

meta_data_schema = StructType(
    [
        StructField("sources", ArrayType(data_store_struct), True),
        StructField("destination", data_store_struct, True),
    ]
)


meta_data = [{
    "sources": [
        {
            "data_store_type": "disk",
            "url": catalog_path,
        },
        {
            "data_store_type": "mariadb",
            "url": transactions_url,
            "database": transactions_db,
            "table": transactions_table,
            "query": transactions_query,
        }
    ],
    "destination": {
        "data_store_type": "cassandra",
        "url": "cassandra:9042",
        "database": KEYSPACE,
        "table": TABLE
    }
}]

meta_data_df = spark.createDataFrame(data=meta_data, schema=meta_data_schema)
meta_data_df.printSchema()

In [None]:
joined_df.alias("joined").join(
    meta_data_df.alias("meta")
).select(
    F.col("id").cast("string").alias("key"),
    F.to_json(
        F.struct(
            F.struct("joined.*").alias("record"),
            F.struct("meta.*").alias("meta_data"),
        )
    ).alias("value"),
).write.format(
    "kafka"
).option(
    "kafka.bootstrap.servers", "kafka:29092"
).option(
    "topic", "persisted"
).save()

In [None]:
spark.stop()