In [1]:
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import from_json, col
from datetime import datetime

In [2]:
sc.stop()

import os
os.chdir("..")
os.chdir("..")
BASE_DIR = os.getcwd()

TOPIC = "topic_one"
BOOTSTRAP_SERVER = "localhost:9092"
ZOOKEEPER = "localhost:2181"

In [3]:
input_dir = f"file://{BASE_DIR}/00_input/data/"
output_dir = f"file://{BASE_DIR}/00_output/data/"

In [3]:
# os.environ["PYSPARK_SUBMIT_ARGS"] = f"--packages org.apache.spark:spark-sql-kafka-0-10_2.11 org.apache.kafka:kafka-clients-2.4.0 pyspark-shell"

In [5]:
spark = SparkSession\
    .builder\
    .appName("StructuredNetworkWordCount")\
    .master("local")\
    .config("spark.jars", f"file://{BASE_DIR}/jars/spark-sql-kafka-0-10_2.11-2.4.4.jar,file://{BASE_DIR}/jars/kafka-clients-2.4.0.jar") \
    .getOrCreate()

In [6]:
sample_tweets_df = spark.read.json(input_dir + "tweets.json")
tweets_schema = sample_tweets_df.schema

In [18]:
tweets_schema.fieldNames()

['contributors',
 'coordinates',
 'created_at',
 'display_text_range',
 'entities',
 'extended_tweet',
 'favorite_count',
 'favorited',
 'filter_level',
 'geo',
 'id',
 'id_str',
 'in_reply_to_screen_name',
 'in_reply_to_status_id',
 'in_reply_to_status_id_str',
 'in_reply_to_user_id',
 'in_reply_to_user_id_str',
 'is_quote_status',
 'lang',
 'place',
 'quote_count',
 'reply_count',
 'retweet_count',
 'retweeted',
 'retweeted_status',
 'source',
 'text',
 'timestamp_ms',
 'truncated',
 'user']

In [19]:
 stream_df = spark\
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", BOOTSTRAP_SERVER) \
        .option("subscribe", TOPIC) \
        .load()

In [41]:
query_df = stream_df\
    .select(from_json(col("value").cast("string"), tweets_schema))\
    .select("jsontostructs(CAST(value AS STRING)).*")\
    .select("created_at", "id", "lang", "source", "text", "user.name", "user.screen_name") 

In [42]:
query_df.printSchema()

root
 |-- created_at: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- source: string (nullable = true)
 |-- text: string (nullable = true)
 |-- name: string (nullable = true)
 |-- screen_name: string (nullable = true)



In [43]:
# Start running the query that prints the running counts to the console
query = query_df\
    .writeStream \
    .format("console") \
    .start()

In [30]:
# Start running the query that prints the running counts to the console
query = query_df \
    .withColumnRenamed("jsontostructs(CAST(value AS STRING))", "value")\
    .writeStream \
    .format("parquet") \
    .option("path", f"file://{BASE_DIR}/00_output/data/twitter_txt")\
    .option("checkpointLocation", f"file://{BASE_DIR}/00_output/data/checkpoints")\
    .start()

#     .trigger(ProcessingTime("120 seconds"))\

In [32]:
query.awaitTermination()