In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, from_json, to_date, col, to_utc_timestamp, explode, split
from pyspark.sql.types import LongType, StructType, StringType
from datetime import datetime
import os
import pytz
import sys
import yaml
sys.path.append('../')

from tweet_parser import TweetParser

spark = SparkSession \
    .builder \
    .appName('Wordle score streaming') \
    .getOrCreate()

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9008) \
    .load()

## Converting date string format
def getDate(x):
    if x is not None:
        return str(datetime.strptime(x,'%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo=pytz.UTC).strftime("%Y-%m-%d %H:%M:%S"))
    else:
        return None

def getResults(text):
    return TweetParser(text).wordle_result_exist()

## UDF declaration
date_fn = udf(getDate, StringType())
attempts_fn = udf(lambda  x: getResults(x), StringType())

schema = StructType(). \
    add('id', LongType(), False). \
    add('created_at', StringType(), False) .\
    add('user', StructType().add("id_str",StringType(), False), False). \
    add('text', StringType(), False)

filtered_data = lines \
    .selectExpr('CAST(value AS STRING)') \
    .select(from_json('value', schema).alias('tweet_data')) \
    .selectExpr('tweet_data.id', 'tweet_data.created_at', 'tweet_data.user.id_str AS user_id', 'tweet_data.text AS message') \
    .withColumn("created_at", to_utc_timestamp(date_fn("created_at"),"UTC")) \
    .withColumn('results', attempts_fn(col('message')))

filtered_data = filtered_data.filter(col('results') != "false")
filtered_data.printSchema()

    
def postgres_sink(df, batch_id):
    config = ""
    with open('../secrets.yml', 'r') as file:
        config = yaml.safe_load(file)
        
    dbname = config['dbname']
    dbtable = 'tweets'
    dbuser = config['dbuser']
    dbpass = config['dbpass']
    dbhost = config['dbhost']
    dbport = config['dbport']

    url = "jdbc:postgresql://"+dbhost+":"+str(dbport)+"/"+dbname
    properties = {
        "driver": "org.postgresql.Driver",
        "user": dbuser,
        "password": dbpass,
        "stringtype":"unspecified"
    }
    df.write.jdbc(url=url, 
      table=dbtable, 
      mode="append",
      properties=properties)
    
# Write to Postgres
query = filtered_data \
    .writeStream \
    .trigger(processingTime='5 seconds') \
    .outputMode("append") \
    .foreachBatch(postgres_sink) \
    .start()


    
query.awaitTermination()