In [None]:
# pextra packages passed in the command line:
pyspark --master local[2] --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2

## 1) Putting csv on HDFS

In [None]:
import pandas as pd
from pyspark.sql import SparkSession

data = {
    "id": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    "firstname": ["Babita", "Zsa Zsa", "Vonny", "Ermengarde", "Karina", "Felice", "Elsie", "Kaia", "Glynnis", "Jany"],
    "age": [27, 39, 48, 24, 51, 36, 55, 36, 43, 28],
    "profession": ["Lawyer", "Musician", "Police Officer", "Teacher", "Software Developer", "Doctor", "Police Officer", "Police Officer", "Designer", "Lawyer"],
    "city": ["Riverside", "Malé", "Bahía Blanca", "Porto Alegre", "Amritsar", "Montreal", "City of San Marino", "Gaza", "Hamburg", "Belize City"],
    "salary": [1558, 5667, 7612, 2451, 3522, 9874, 2231, 2263, 6983, 8769]
}
# Writing file locally
csv_file_path = "example_data.csv"
df = pd.DataFrame(data)
df.to_csv(csv_file_path, index=False)

spark = SparkSession.builder.appName("CSV to HDFS").getOrCreate()
hdfs_output_path = "hdfs:///user/vagrant/kafka/data"

# Upload csv file to pyspark dataframe
spark_df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
spark_df.show()
# saving data to csv in parquet format
spark_df.write.mode("overwrite").parquet(hdfs_output_path)
spark.stop()

## 2) streaming from a csv file

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

schemal = StructType([StructField('id', IntegerType(), True),
                    StructField('name', StringType(), True),
                    StructField('age', IntegerType(), True),
	                StructField('profession', StringType(), True),
	                StructField('city', StringType(), True),
	                StructField('salary', DoubleType(), True)])

customer = spark.readStream.format("csv").schema("schemal"). \
    option("header",True).option("maxFilesPerTrigger", 1). \
        load("/user/vagrant/kafka/data/")

average_salaries = customer. \
    groupBy("profession"). \
    agg((avg("salary").alias("average_salary")), (count("profession").alias("count"))). \
    sort(desc("average_salary"))

query = average_salaries.writeStream.format("console").outputMode("complete").start()
query.stop()

### 3) streaming from kafka (receive live messages from a producer)

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

df = spark.readStream.format("kafka"). \
    option("kafka.bootstrap.servers", "localhost:9092"). \
    option("subscribe", "streamtest").load()
df1 = df.selectExpr("CAST(value AS STRING)")

schemap = StructType ([StructField('id', IntegerType(), True),
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
        StructField('lastname', StringType(), True),
        StructField('dob_year', IntegerType(), True),
        StructField('dob_month', IntegerType(), True),
        StructField('gender', StringType(), True),
        StructField('salary', IntegerType(), True)])

# parsing
pdf = df1.select(from_json(col("value"), schemap).alias("data")).select("data.*")

"""
    q1 - to display on console
    q2 - to save as csv in hdfs
    q3 - to publish as another topic after some modifications
"""

q1 = pdf.writeStream.format("console").outputMode("append").start()
# q1 awaitTermination()

q2 = ( pdf.writeStream.format("csv").outputMode("append"). 
    option("path", "kafka/data/"). 
    option("checkpointLocation", "/tmp/vagrant/checkpoint").start() )

mpdf1 = (
        mpdf.selectExpr(
            "CAST(id AS STRING)",
            "CAST(firstname AS STRING)",
            "CAST(middlename AS STRING)",
            "CAST(lastname AS STRING)",
            "CAST(dob_year AS STRING)",
            "CAST(dob_month AS STRING)",
            "CAST(gender AS STRING)",
            "CAST(salary AS STRING)",
        )
        .withColumn("value", to_json(struct("*")).cast("string"),)
    )
q3 = (
        mpdf1
        .select("value")
        .writeStream
        .outputMode("append")
        .format("kafka")
        .option("topic", "streamout")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("checkpointLocation", "/tmp/vagrant/checkpoint")
        .start()
  )

In [None]:
# streaming data as producer:
{"id":1,"firstname":"James ","middlename":"","lastname":"Smith","dob_year":2018,"dob_month":1,"gender":"M","salary":3000}
{"id":2,"firstname":"Michael ","middlename":"Rose","lastname":"","dob_year":2010,"dob_month":3,"gender":"M","salary":4000}
{"id":3,"firstname":"Robert ","middlename":"","lastname":"Williams","dob_year":2010,"dob_month":3,"gender":"M","salary":4000}
{"id":4,"firstname":"Maria ","middlename":"Anne","lastname":"Jones","dob_year":2005,"dob_month":5,"gender":"F","salary":4000}
{"id":5,"firstname":"Jen","middlename":"Mary","lastname":"Brown","dob_year":2010,"dob_month":7,"gender":"","salary":-1}

In [None]:
q1.stop()
q2.stop()
q3.stop()

### 4) Streaming from a socket

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.functions import *

In [None]:
lines = spark.readStream.format("socket"). \
    option("host", "localhost"). \
    option("port", 9999).load()
lines.isStreaming

In [None]:
# split the lines into words:
words = lines.select(
    explode(
        split(lines.value, " ")
    ).alias("word")
)
wordCounts = words.groupBy("word").count().sort('count', ascending=False)

query = wordCounts.writeStream.outputMode("complete").format("console").start()
#query.awaitTermination()
#query.stop()

In [None]:
df = spark.readStream.format("kafka"). \
    option("kafka.bootstrap.servers", "localhost:9092"). \
    option("subscribe", "streamtest").load()
df1 = df.selectExpr("CAST(value AS STRING)")

words = df1.select(
    explode(
        split(df1.value, " ")
    ).alias("word")
)
wordCounts = words.groupBy("word").count()
 
# Filter words containing the letter 'z' or an email address
filtered_words = words.filter(
    (col("word").rlike(".*z.*")) | (col("word").rlike("^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"))
)
 
# Display filtered words
query = filtered_words \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
 
#query.awaitTermination()

## 5) Publish the counts as a kafka stream
##### as key:value pairs - key is a word, value is the count

In [None]:
words2 = df1.select(
    explode(
        split(df1.value, " ")
    ).alias("key")
)

wodCounts2 = words2.grouBy("key").count(). \
    withColumnRenamed("count", "value"). \
    withColumn("value",col("value").cast(StringType()))

query2 = ( wordCounts2.writeStream.
          format("kafka").outputMode("update").
            option("kafka.bootstrap.servers", "localhost:9092").
            option("topic", "streamout").
            option("checkpointLocation", "/tmp/vagrant/checkpoint")            
)

#query2.awaitTermination()
#query2.stop()