In [1]:
import sys, re
import time
import datetime

In [2]:
TOPIC_Step1_NAME="Sahamyab-Tweets"
TOPIC_Step2_NAME="Sahamyab-Tweets-Processed"
KAFKA_SERVER="kafka-broker:29092"

In [3]:
import os
# https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

# setup arguments
os.environ['PYSPARK_SUBMIT_ARGS']='--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 pyspark-shell'

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("Step3_2-Simple-Pipeline") \
    .config("spark.executor.memory", "512mb") \
    .config("spark.executor.cores","1") \
    .config("spark.cores.max", "1") \
    .config("spark.sql.session.timeZone", "Asia/Tehran") \
    .getOrCreate()    
    
    



:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b8ccb440-f252-4562-b718-ebdd90920735;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 486ms :: artifacts dl 8ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 from central in [default]
	

In [4]:
spark.sparkContext.setLogLevel("ERROR")


In [5]:
def get_tags(text):
    tags = re.findall(r"#(\w+)", text)
    return tags

In [6]:
get_tags_udf = udf(get_tags, ArrayType(StringType()))

In [7]:
# https://sparkbyexamples.com/spark/spark-sql-structtype-on-dataframe/
schema = StructType([StructField("id", StringType(), True),\
                         StructField("content", StringType(), True),\
                         StructField("sendTime", StringType(), True),\
                         StructField("sendTimePersian", StringType(), True),\
                         StructField("senderName", StringType(), True),\
                         StructField("senderUsername", StringType(), True),\
                         StructField("type", StringType(), True),
                    ])

In [8]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", KAFKA_SERVER) \
  .option("subscribe", TOPIC_Step1_NAME) \
  .option("startingOffsets", "earliest") \
  .option("failOnDataLoss", "false")\
  .option("kafka.group.id", "step3_2-Simple-Pipeline")\
  .load()

In [9]:
tweetsStringDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

tweetsDF = tweetsStringDF.select(from_json(col("value"), schema).alias("data")).select("data.*")
tweetsDF = tweetsDF.withColumn("hashtags", get_tags_udf(col("content")) )\
            .withColumn("timestamp", unix_timestamp("sendTime", "yyyy-MM-dd'T'HH:mm:ss'Z'").cast('timestamp')) \
            .withColumn("persian_timestamp", from_utc_timestamp("timestamp", "Asia/Tehran").cast('timestamp')) \
            .withColumn("persianYear", tweetsDF['sendTimePersian'].substr(0, 4)) \
            .withColumn("persianMonth", tweetsDF['sendTimePersian'].substr(6, 2)) \
            .withColumn("persianDay", tweetsDF['sendTimePersian'].substr(9, 2))
            


In [10]:
kafka_writer = tweetsDF \
  .select(col("id").alias("key"),to_json(struct(\
                                                [coalesce(col(x), lit("")).alias(x) if x!="hashtags" else  coalesce(col(x), array([])).alias(x) for x in tweetsDF.columns ]
                                               )).alias("value"))\
  .writeStream\
  .format("kafka") \
  .outputMode("append")\
  .option("kafka.bootstrap.servers", KAFKA_SERVER) \
  .option("topic", TOPIC_Step2_NAME) \
  .option("checkpointLocation","/opt/spark-apps/checkpoints/step3_2-simple-pipeline")\
  .start()

#     .writeStream \
#     .outputMode('append')\
#     .format('console')\
#     .start()

                                                                                

In [11]:
kafka_writer.stop()