# Spark Consumer

TODO: 

-find how to make output to 2 places : save json to hdfs and table to hive

-how come sometimes there are 2 tweets in one output, like:
<pre>
+----+--------------------+  
| key|               value|  
+----+--------------------+  
|null|"@SenWarren Need ...|  
|null|"RT @TrinityResis...|  
+----+--------------------+  
</pre>
-how to consume from a kafka topic everything there is up until now (cursor)  
-how to cope with shit like  <pre>\\ud83c\\uddf5\\ud83c\\uddf8\\u2764\\</pre> instead of arabic chars https://stackoverflow.com/questions/18337407/saving-utf-8-texts-in-json-dumps-as-utf8-not-as-u-escape-sequence


-how to dump data to HDFS in large parque files

In [None]:
from __future__ import print_function

import sys, os
from time import time, sleep
from itertools import cycle
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, split, from_json, flatten, udf

from common_vars import *

In [None]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 pyspark-shell' 

In [None]:
print(len(tweet_keys))
print(len(tweet_types))
print(len(user_keys))
print(len(user_types))

In [None]:
def poll_continiously(Query, period = .35, attrname = 'status'):
    chars = "⠁⠂⠄⡀⢀⠠⠐⠈"
    #chars = '←↖↑↗→↘↓↙'
    #chars = 'wingardium_leviosa '
    #chars = 'I solemnly swear that I am up to no good'.replace(' ','_')+' '
    spinner = cycle(chars)
    while True:
        try:
            sleep(period)
            val = getattr(Query, attrname)
            print('\r'+next(spinner)+repr(val)[:100]+(' '*100) ,end = '')
        except KeyboardInterrupt:
            print('\rSTOPPED polling (KeyboardInterrupt)'+(' '*100) ,end = '')
            break    
        except Exception as e :
            errors.append([year,e])
            print("oops")

In [None]:
#spark.stop()

In [None]:
spark = SparkSession\
    .builder\
    .appName("StructuredTwitterJsonArchive")\
    .getOrCreate()

In [None]:
def assembleStructType(ST,keys,types):
    ST = ST.add(keys[0],types[0]())
    if len(keys)>1:
        return assembleStructType(ST,keys[1:],types[1:])
    else:
        return ST

test = assembleStructType(StructType(), tweet_keys, tweet_types)



In [None]:
# value schema: { "user": "user_keys", "tweet": "tweet_keys" }

tweet_struct = assembleStructType(StructType(), tweet_keys, tweet_types)

user_struct = assembleStructType(StructType(), user_keys, user_types)

schema = StructType().add("users", user_struct).add("tweets", tweet_struct)

print(schema)

In [None]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", topic) \
  .option("failOnDataLoss" , "false")\
  .load()\
  .select(col("key").cast("string"), from_json(col("value").cast("string"), schema).alias('value'))


## Raw JSON Data to archive 

In [None]:
# Json to HDFS sink with partitioning

targetJsonHDFS = df\
    .select(col('key'),col('value.*'))\
    .writeStream\
    .format("json")\
    .outputMode("append")\
    .partitionBy("key")\
    .option("path", "hdfs://localhost:8020" + hdfs_archive_path)\
    .option("checkpointLocation", "hdfs://localhost:8020" + hdfs_archive_checkpoint_path)\
    .start()
#targetJsonHDFS.awaitTermination()

In [None]:
poll_continiously(targetJsonHDFS)

In [None]:
targetJsonHDFS.explain()

In [None]:
targetJsonHDFS.exception()

In [None]:
targetJsonHDFS.stop()

## Tweets to HIVE

In [None]:

def Sentiment(sent):
    from textblob import TextBlob
    return int(TextBlob(sent).sentiment.polarity*100)

Sentiment_udf = udf(Sentiment, IntegerType())


In [None]:

def n_words(sent):
    from textblob import TextBlob
    return len(TextBlob(sent).words)
n_words("hello how are you")

n_words_udf = udf(n_words, IntegerType())


In [None]:
# Dump new tweets to HDFS as Parquet (small files):
tweets2HIVE = df\
    .select(col('key'),col(f'value.tweets.*'))\
    .withColumn('Sentiment',Sentiment_udf('text'))\
    .withColumn('n_words',n_words_udf('text'))\
    .where("n_words > 10")\
    .writeStream\
    .format("parquet")\
    .outputMode("append")\
    .partitionBy("key")\
    .trigger(processingTime="1 minutes")\
    .option("path", "hdfs://localhost:8020" + hdfs_hive_tweets)\
    .option("checkpointLocation", "hdfs://localhost:8020" + hdfs_hive_tweets + "/sparkcheckpoint" )\
    .start()   

In [None]:
poll_continiously(tweets2HIVE)

In [None]:
tweets2HIVE.explain()

In [None]:
tweets2HIVE.exception()

In [None]:
tweets2HIVE.stop()

## Users to HIVE

In [None]:
# Dump new users to HDFS as Parquet (small files): 
users2HIVE = df\
    .select(col('key').alias('last_tweet_at'), col('value.users.*'))\
    .dropDuplicates(subset=['id'])\
    .writeStream\
    .format("parquet")\
    .outputMode("append")\
    .partitionBy(partitionCol)\
    .option("path", "hdfs://localhost:8020" + hdfs_hive_users_staging)\
    .option("checkpointLocation", "hdfs://localhost:8020" + hdfs_hive_users_staging + "/sparkcheckpoint" )\
    .start()   

In [None]:
poll_continiously(users2HIVE)

In [None]:
users2HIVE.explain()

In [None]:
users2HIVE.exception()

In [None]:
users2HIVE.stop()

## HIVE refresher job for tweets and users

In [None]:
def refresh_hive_table(df, epoch_id):
    from pyhive import hive
    hive_cnx = hive.Connection(
        host = 'localhost', 
        port = 10000, 
        username = 'hdfs',
        password = 'naya',
        auth = 'CUSTOM',
        database='twitter')
    with hive_cnx.cursor() as cursor:    
        cursor.execute(" MSCK REPAIR TABLE tweets ")      
        cursor.execute(" MSCK REPAIR TABLE users ")


In [None]:
HIVErefresher = df\
                .writeStream\
                .foreachBatch(refresh_hive_table)\
                .trigger( processingTime='1 minute')\
                .start()

In [None]:
poll_continiously(HIVErefresher, period=.2)

In [None]:
HIVErefresher.status

In [None]:
HIVErefresher.stop()

# Make users unique by 4-step aggregation query (does not work)

In [None]:
       

def assemble_queries(year):
    queries = []

    queries.append("SET hive.exec.dynamic.partition.mode=nonstrict")
    queries.append("SET hive.exec.dynamic.partition = true;")
    queries.append("SET hive.mapred.mode = nonstrict;")
    queries.append("SET hive.optimize.sort.dynamic.partition=false")
    queries.append("SET spark.sql.hive.convertMetastoreParquet=false")
    #queries.append("SET hive.exec.stagingdir=.hive-staging;")
    queries.append("MSCK REPAIR TABLE users_staging")

    cols = ', '.join(colNames)

    queries.append(f"insert into users_staging partition({partitionCol}={year}) select {cols} from users where {partitionCol}={year}")

    #queries.append(f"alter table users drop partition ({partitionCol}={year})")

    #This is probably the right query for the 'merge' of partitions
    q42 = f"""
    SELECT last_tweet_at, id, name, screen_name, created_at, `location` , url, protected, verified, followers_count, friends_count, 
           listed_count, favourites_count, statuses_count, withheld_in_countries
    FROM (
        SELECT last_tweet_at, id, name, screen_name, created_at, `location` , url, protected, verified, followers_count, friends_count, 
           listed_count, favourites_count, statuses_count, withheld_in_countries,
           rank() over (partition by id order by last_tweet_at desc) as last_rank
        from users_staging
        where {partitionCol} ={year}
    ) ranked_users
    WHERE ranked_users.last_rank = 1
    """

    queries.append(f"insert overwrite table users partition({partitionCol}={year}) {q42}")

    queries.append(f"alter table users_staging drop partition ({partitionCol}={year})")
    return queries

assemble_queries('201703')

In [None]:
#q = "select distinct created_ym from users_staging order by created_ym desc"
queries = ["MSCK REPAIR TABLE users_staging"]
queries.append("select created_ym, count(created_ym) from users_staging group by created_ym")
res = run_queries(queries)

years = [r[0] for r in res]
years = cycle(years)
years

In [None]:
errors = []

In [None]:
while True:    
    year = next(years)
    print('|' + year , end = '')
    try:
        queries = assemble_queries(year)
        run_queries(queries)
    except KeyboardInterrupt:
        print('KeyboardInterrupt')
        break    
    except Exception as e :
        errors.append([year,e])
        print("oops")
    

In [None]:
errors

In [None]:
year = '201703'

queries = []

queries.append("SET hive.exec.dynamic.partition.mode=nonstrict")
queries.append("SET hive.exec.dynamic.partition = true;")
queries.append("SET hive.mapred.mode = nonstrict;")
queries.append("SET hive.optimize.sort.dynamic.partition=false")
queries.append("SET spark.sql.hive.convertMetastoreParquet=false")
queries.append("SET hive.exec.stagingdir=.hive-staging;")
queries.append("MSCK REPAIR TABLE users_staging")

cols = ', '.join(colNames)

queries.append(f"insert into users_staging partition({partitionCol}) select {cols} from users")

#queries.append(f"alter table users drop partition ({partitionCol}={year})")

#This is probably the right query for the 'merge' of partitions
q42 = f"""
SELECT last_tweet_at, id, name, screen_name, created_at, `location` , url, protected, verified, followers_count, friends_count, 
       listed_count, favourites_count, statuses_count, withheld_in_countries, created_ym 
FROM (
    SELECT last_tweet_at, id, name, screen_name, created_at, `location` , url, protected, verified, followers_count, friends_count, 
       listed_count, favourites_count, statuses_count, withheld_in_countries, created_ym, 
       rank() over (partition by id order by last_tweet_at desc) as last_rank
    from users_staging        
) ranked_users
WHERE ranked_users.last_rank = 1
"""

queries.append(f"insert overwrite table users partition({partitionCol}) {q42}")

#queries.append(f"alter table users_staging drop partition ({partitionCol}={year})")

print(queries[-1])



## Spark.SQL Table

In [None]:
dbg = df \
    .select(col("value.tweet.*"))\
    .writeStream \
    .queryName("aggregate2") \
    .outputMode("append") \
    .format("memory")\
    .start()

In [None]:
dbg.stop()

In [None]:
a = spark.sql("select * from aggregate2") # interactively query in-memory table

In [None]:
b  = a.write.parquet("hdfs://localhost:8020/tmp/project/tmp.parquet")

In [None]:
a.show()

In [None]:
a.show(10,200)


In [None]:
file = "/tmp/project/hive/key=2019-12-25_18-15/part-00000-c5543d3a-7e1d-467d-a0d7-9b8893fe9bb4.c000.snappy.parquet"
c = spark.read.parquet('hdfs://localhost:8020'+file)

In [None]:
c.head()

In [None]:
c.show()

In [None]:
spark.sql("select * from aggregate2").show(100,1100) # interactively query in-memory table

In [None]:
len(a)

In [None]:
a

## Sandbox 
below are cells that I played with to test portions of code


In [None]:

options={"timestampFormat": "%a %b %d %H:%M:%S %z %Y"} #"yyyy-MM-dd HH:mm:ss"}
options


In [None]:
df.printSchema()

In [None]:
 schema = StructType().add("value", StructType().add("user", StringType())
                                            .add("tweet", StringType()))

In [None]:
schema

In [None]:
 schema = StructType().add("value", StructType().add("value", StringType()))

In [None]:
schema

In [None]:
from pyspark.sql.types import StructType, StringType, MapType, StructField


schema = StructType().add('created_at', StringType(), False).add('id_str', StringType(), False)
schema

In [None]:
schema = StructType([StructField("value", StringType(), True), True),])

In [None]:
schema

In [None]:
schema = StructType([
    StructField("created_at", StringType(), True),
    StructField("id", StringType(), True),
    StructField("text", StringType(), True),
])

In [None]:
data = spark.read.json(sampleFilePath, schema, multiLine=True)
print(data)

In [None]:
wat = data.collect()

In [None]:
wat[0].asDict()

In [None]:
wat = spark.read.option("multiLine", True).json(sampleFilePath)

In [None]:
wat.collect()

In [None]:
jsondf = spark.read.json(Seq(jsonstr).toDS) 

In [None]:
wat = schema.collect()

In [None]:
type(wat[0])

In [None]:
sampleFilePath = "/home/naya/DataEngineerProject/part-00000-7ad71167-a959-4478-929a-6ab7607844d4.c000.json"
sampleFilePath

In [None]:


with open(sampleFilePath) as f:
    txt = f.read()
    
(txt.__repr__())

json.loads(txt.replace('\\"','\"'))

In [None]:
sampleFilePath = "hdfs://localhost:8020" +\
                "/tmp/project/archive/key=2019-12-22_10-18/part-00000-7ad71167-a959-4478-929a-6ab7607844d4.c000.json"
sampleFilePath

In [None]:
from pyspark.sql import SQLContext
sf = sqlContext.read.json(sampleFilePath)

In [None]:
# json to HDFS sink with trigger

targetJsonHDFS = df\
    .writeStream\
    .format("json")\
    .outputMode("append")\
    .option("path", "hdfs://localhost:8020" + hdfs_archive_path)\
    .trigger(processingTime="5 seconds")\
    .option("checkpointLocation", "hdfs://localhost:8020" + hdfs_archive_checkpoint_path)\
    .start()
targetJsonHDFS.awaitTermination()

In [None]:
targetJsonHDFS.stop()

In [None]:
# parquet to HDFS sink example with trigger

targetParquetHDFS = df\
    .writeStream\
    .format("parquet")\
    .outputMode("append")\
    .option("path", "hdfs://localhost:8020/tmp/project")\
    .trigger(processingTime="5 seconds")\
    .option("checkpointLocation", "hdfs://localhost:8020/tmp/sparkcheckpoint/")\
    .start()
targetParquetHDFS.awaitTermination()

In [None]:
targetParquetHDFS.stop()

In [None]:
#write parquet files to local linux dir
query = df \
    .writeStream \
    .format("parquet")       \
    .option("path", "/tmp/project")\
    .option("checkpointLocation", "/tmp/sparkcheckpoint/")\
    .outputMode("append")\
    .start()
query.awaitTermination()

In [None]:
#write parquet files to local linux dir
query = df \
    .writeStream \
    .format("parquet")       \
    .option("path", "/tmp/project")\
    .option("checkpointLocation", "/tmp/sparkcheckpoint/")\
    .outputMode("append")\
    .start()
query.awaitTermination()

In [None]:
query.stop()

In [None]:
#output to console
# query = df \
#     .writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .start()
# query.awaitTermination()

In [None]:
# words = df.select(
#    explode(
#        split(df.value, " ")
#    ).alias("word")
# )

# Generate running word count
#wordCounts = words.groupBy("word").count()

In [None]:
# query = wordCounts \
#     .writeStream \
#     .outputMode("complete") \
#     .format("console") \
#     .start()
# query.awaitTermination()