In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m548.4 kB/s[0m eta [36m0:00:00[0m00:01[0m00:02[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l|

In [None]:
from pyspark.sql.functions import from_json, col, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SparkSession

In [2]:
def write_to_cassandra(target_df, batch_id):
    target_df.write \
        .format("org.apache.spark.sql.cassandra") \
        .option("keyspace", "spark_db") \
        .option("table", "customer_search") \
        .mode("append") \
        .save()
    target_df.show()

In [3]:
spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("Stream Table Join Demo") \
        .config("spark.streaming.stopGracefullyOnShutdown", "true") \
        .config("spark.sql.shuffle.partitions", 2) \
        .config("spark.cassandra.connection.host", "localhost") \
        .config("spark.cassandra.connection.port", "9042") \
        .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
        .config("spark.sql.catalog.lh", "com.datastax.spark.connector.datasource.CassandraCatalog") \
        .getOrCreate()

In [4]:
search_schema = StructType([
        StructField("id", StringType()),
        StructField("product_searched", StringType()),
        StructField("customer_id", StringType()),
        StructField("location_id", StringType()),
        StructField("CreateDate", StringType())
    ])

In [5]:
kafka_source_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "product-search") \
        .option("startingOffsets", "earliest") \
        .option("failOnDataLoss", False) \
        .load()

In [6]:
value_df = kafka_source_df.select(from_json(col("value").cast("string"), search_schema).alias("value"))

In [7]:
search_df = value_df.select("value.*") 
#         .withColumn("CreateDate", to_timestamp(col("CreateDate"), "yyyy-MM-dd HH:mm:ss"))

In [8]:
search_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- product_searched: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- location_id: string (nullable = true)
 |-- CreateDate: string (nullable = true)



In [9]:
#add csv withlocation to do a join

schema_csv = StructType([
        StructField("location_id", StringType()),
        StructField("name", StringType()),    
        StructField("country_id", StringType()),
        StructField("country_code", StringType()),
        StructField("country_name", StringType()),
        StructField("state_code", StringType()),
        StructField("type", StringType()),
        StructField("latitude", StringType()),
        StructField("longitude", StringType())    
    ])

In [10]:
#schema_csv=(StructType().add("location_code",StringType()).add("location_region",StringType()).add("location_country",StringType()))
locations = spark.read.format("csv")\
     .option("header", True)\
     .schema(schema_csv)\
     .load("zLocations.csv").alias("locations")

In [11]:
search_locations_df = search_df.join(locations, search_df["location_id"]==locations["location_id"], "left")

In [12]:
customer_df = spark.read \
        .format("org.apache.spark.sql.cassandra") \
        .option("keyspace", "spark_db") \
        .option("table", "customer") \
        .load()

In [13]:
join_expr = customer_df.customer_id == search_locations_df.customer_id
join_type = "inner"

In [14]:
joined_df = customer_df.join(search_locations_df, join_expr, join_type) \
        .drop(customer_df.customer_id)

In [15]:
joined_df.printSchema()

root
 |-- address: string (nullable = true)
 |-- age: string (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- email: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- product_searched: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- location_id: string (nullable = true)
 |-- CreateDate: string (nullable = true)
 |-- location_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- country_id: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- country_name: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- type: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [16]:
output_df = joined_df.select(col("id"), col("customer_id"), col("user_name").alias("customer_name"), col("product_searched"), 
                             col("CreateDate").alias("search_date"), col("country_name"),col("name").alias("state"))
#                             .withColumn("idNum", col("customer_id").cast(IntegerType())) \
#                             .withColumn("id",    col("id").cast(IntegerType()))

In [17]:
kafka_target_df = output_df.selectExpr("id as key",
                                                 """to_json(named_struct(
                                                 'id', id,
                                                 'customer_id', customer_id,
                                                 'customer_name', customer_name,
                                                 'product_searched', product_searched,
                                                 'search_date', search_date,
                                                 'country_name', country_name,
                                                 'state', state
                                                 )) as value""")

In [18]:
notification_writer_query = kafka_target_df \
        .writeStream \
        .queryName("Product Customer Writer") \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "product-customer-qty") \
        .outputMode("append") \
        .option("checkpointLocation", "./checkpoints/joins-proj/") \
        .start()

In [None]:
notification_writer_query.awaitTermination()