# SparkSQL assignment with Python
## José Vicente Mellado

### Reading file

In [None]:
## No need to run these lines, everything has been configured in spark-env.sh

#import findspark
#findspark.init('/spark_dir')

##Configuramos el sparksession
#import pyspark
#from pyspark.sql import SparkSession

#spark = (SparkSession.builder
#         .master('local[*]')
#         .config('spark.driver.cores', 1)
#         .appName('estudio_spark')
#         .getOrCreate()
#        )
##obtenemos el sparkcontext a partir del sparksession
#sc = spark.sparkContext

In [1]:
from pyspark.sql.types import *

st = StructType([
        StructField('ID', LongType(), True),
        StructField('PARENT-SYS-ID', StringType(), True),
        StructField('Source', StringType(), True),
        StructField('Mentions', StringType(), True),
        StructField('Target', StringType(), True),
        StructField('NAME Source', StringType(), True),
        StructField('BODY', StringType(), True),
        StructField('PUBDATE', TimestampType(), True),
        StructField('URLs coma separated', StringType(), True),
        StructField('Type TW-RT-MT', StringType(), True),
        StructField('LINK', StringType(), True),
        StructField('n1 Link', ByteType(), True),
        StructField('n1 Picture', ByteType(), True),
        StructField('PERSONAL-WEBSITE', StringType(), True),
        StructField('COUNTRY', StringType(), True),
        StructField('ALL-NICK-ACTIVITY-EVER', LongType(), True),
        StructField('NICK-FOLLOWERS', LongType(), True),
        StructField('FRIENDS-FOLLOWING-AUDIENCE', LongType(), True),
        StructField('LOCATION', StringType(), True)
    ]
)

#https://spark.apache.org/docs/2.0.0-preview/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader
df = spark.read.csv('tweets.csv', 
                    header=True, 
                    sep='\t',
                    schema=st,
                    timestampFormat='dd/MM/yyyy HH:mm',
                    mode='PERMISSIVE')

### a) Contabilizar el número total de menciones a los pilotos Marc Márquez, Valentino Rossi y Dani Pedrosa.



In [51]:
(df.select('Mentions')
 .filter(df['Mentions'].like('%marcmarquez93%') | 
         df['Mentions'].like('%valeyellow46%') | 
         df['Mentions'].like('%26_danipedrosa%'))
 .rdd.flatMap(lambda mentions: 
              filter(lambda x: x == 'marcmarquez93' or x == 'valeyellow46' or x == '26_danipedrosa', 
                     list(set(mentions[0].split(',')))
                    )
             )
 .map(lambda user: (user, 1))
 .reduceByKey(lambda a, b: a + b)
 .collect()
)

# Alternative query
#from pyspark.sql.functions import explode, split
#
#(df.select('ID', 'Mentions')
# .filter(df['Mentions'].like('%marcmarquez93%') | 
#         df['Mentions'].like('%valeyellow46%') | 
#         df['Mentions'].like('%26_danipedrosa%'))
# .withColumn('Mentions', explode((split('Mentions', ','))))
# .distinct()
# .filter('''Mentions = 'marcmarquez93' or 
#            Mentions = 'valeyellow46' or 
#            Mentions = '26_danipedrosa'
#         ''')
# .groupBy('Mentions')
# .count() 
# .show())

[('26_danipedrosa', 12341), ('marcmarquez93', 58027), ('valeyellow46', 61103)]

### b) Contabilizar los 5 países que más tweets han publicado (considerando los tweets que contengan dicha información).


In [15]:
(df.select('COUNTRY')
 .filter(df['COUNTRY'] != 'not public')
 .groupBy('COUNTRY')
 .count()
 .orderBy('count', ascending=False)
 .take(5)
)

[Row(COUNTRY='es', count=172577),
 Row(COUNTRY='us', count=12722),
 Row(COUNTRY='gb', count=12588),
 Row(COUNTRY='id', count=8725),
 Row(COUNTRY='it', count=1843)]

### c) Contabilizar los 3 hashtags más utilizados (que aparezcan el mayor número de veces) en el cuerpo de los tweets (campo "body").


In [52]:
(df.select('BODY')
 .filter(df['BODY'].like('%#%'))
 .rdd.flatMap(lambda mentions: 
              filter(lambda x: x.startswith('#'), 
                     list(set(mentions[0].split(' ')))
                    )
             )
 .map(lambda hashtag: (hashtag, 1))
 .reduceByKey(lambda a, b: a + b)
 .takeOrdered(3, key = lambda hashtag: -hashtag[1])
)

# Alternative query
#(df.select('ID', 'BODY')
# .filter(df['BODY'].like('%#%'))
# .withColumn('BODY', explode((split('BODY', ' '))))
# .distinct()
# .filter("BODY like '#%'")
# .groupBy('BODY')
# .count()
# .orderBy('count', ascending=False)
# .take(3)
#)

[('#motogp', 51911), ('#qatar', 9974), ('#moto3', 5793)]

# Spark Streaming

## Please run first the following lines: 



cd ./kafka_2.11-0.10.2.0/
./bin/zookeeper-server-start.sh ./config/zookeeper.properties

./bin/kafka-server-start.sh ./config/server.properties

cd ./sparksql-sparkstreaming-kafka

source activate conda_env_name

python ./timestamp_kafka_producer.py Quatar_GP_2014 ./tweets.csv 

### Note: tweets.csv is sorted by date

### The following lines were provided by the teachers

In [4]:
from __future__ import print_function
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from operator import add
from operator import sub

ssc = StreamingContext(sc, 5) # 5 seconds

kafkaBrokerIPPort = "127.0.0.1:9092"

import kafka

class KafkaProducerWrapper(object):
  producer = None
  @staticmethod
  def getProducer(brokerList):
    if KafkaProducerWrapper.producer != None:
      return KafkaProducerWrapper.producer
    else:
      KafkaProducerWrapper.producer = kafka.KafkaProducer(bootstrap_servers=brokerList, key_serializer=str.encode, value_serializer=str.encode)
      return KafkaProducerWrapper.producer
 
def sendMetrics(itr):
  prod = KafkaProducerWrapper.getProducer([kafkaBrokerIPPort])
  for m in itr:
    prod.send("metrics", key=m[0], value=m[0]+","+str(m[1]))
  prod.flush()

In [5]:
import ujson

topic_name = 'Quatar_GP_2014'

kafkaParams = {"metadata.broker.list": kafkaBrokerIPPort}
stream = KafkaUtils.createDirectStream(ssc, [topic_name], kafkaParams)
stream = stream.map(lambda o: ujson.loads(o[1]))

### a) Calcular el número total de menciones recibidas por cada cuenta de usuario durante el intervalo de 5 segundos.


In [135]:
ssc.checkpoint("checkpoint")

counts = (stream.flatMap(lambda line: list(filter(lambda x: len(x) > 0, line['Mentions'].split(','))))
                .map(lambda user: (user, 1)) 
                .reduceByKey(lambda a, b: a + b))

counts.pprint()

# In case we wanted to send the result to another Kafka queue
#counts.foreachRDD(lambda rdd: rdd.foreachPartition(sendMetrics))

In [131]:
ssc.start()

### b) Calcular la frecuencia total acumulada de apariciones de cada hashtag en el campo body, actualizando un ranking con los 5 hashtags con mayor frecuencia de aparición.


In [31]:
ssc.checkpoint("checkpoint")

top_5 = (stream.flatMap(lambda line: filter(lambda word: word.startswith('#'), list(set(line['BODY'].split(' ')))))
 .map(lambda hashtag: (hashtag, 1))
 .updateStateByKey(lambda currentVal, totalVal: sum(currentVal) + totalVal if totalVal != None else sum(currentVal))
 .transform(lambda rdd: rdd.context.parallelize( 
                            rdd.takeOrdered(5, key = lambda hashtag: -hashtag[1])
                        ) 
           )
)

top_5.pprint()

# In case we wanted to send the result to another Kafka queue
#top_5.foreachRDD(lambda rdd: rdd.foreachPartition(sendMetrics))

In [32]:
ssc.start()

-------------------------------------------
Time: 2017-06-16 22:01:00
-------------------------------------------
('#motogp', 5)
('#honda', 1)
('#alfaromeo', 1)
('#qatar', 1)
('#gasidolo', 1)

-------------------------------------------
Time: 2017-06-16 22:01:05
-------------------------------------------
('#motogp', 11)
('#vr46', 2)
('#staytogether', 2)
('#honda', 1)
('#alfaromeo', 1)

-------------------------------------------
Time: 2017-06-16 22:01:10
-------------------------------------------
('#motogp', 20)
('#qatar', 2)
('#vr46', 2)
('#staytogether', 2)
('#honda', 1)

-------------------------------------------
Time: 2017-06-16 22:01:15
-------------------------------------------
('#motogp', 26)
('#qatar', 3)
('#news', 2)
('#motorcycle', 2)
('#qatar!!', 2)

-------------------------------------------
Time: 2017-06-16 22:01:20
-------------------------------------------
('#motogp', 28)
('#vr46', 5)
('#staytogether', 5)
('#qatar', 3)
('#news', 2)

--------------------------------

### c) Calcular en una ventana temporal 20 segundos con offset de 10 segundos la frecuencia de aparición de cada uno de los 3 posibles tipos de tweets (TW-RT-MT).


In [7]:
ssc.checkpoint("checkpoint")

windowed_type_count = (stream.map(lambda line: (line['Type TW-RT-MT'], 1))
 .reduceByKeyAndWindow(add, sub, 20, 10)
)

windowed_type_count.pprint()

# In case we wanted to send the result to another Kafka queue
#windowed_type_count.foreachRDD(lambda rdd: rdd.foreachPartition(sendMetrics))

In [8]:
ssc.start()

-------------------------------------------
Time: 2017-06-18 16:41:40
-------------------------------------------
('MT', 4476)
('TW', 3610)
('RT', 12841)

-------------------------------------------
Time: 2017-06-18 16:41:50
-------------------------------------------
('MT', 4488)
('TW', 3628)
('RT', 12875)

-------------------------------------------
Time: 2017-06-18 16:42:00
-------------------------------------------
('MT', 26)
('TW', 21)
('RT', 83)

-------------------------------------------
Time: 2017-06-18 16:42:10
-------------------------------------------
('MT', 28)
('TW', 15)
('RT', 87)

-------------------------------------------
Time: 2017-06-18 16:42:20
-------------------------------------------
('MT', 32)
('TW', 25)
('RT', 77)

-------------------------------------------
Time: 2017-06-18 16:42:30
-------------------------------------------
('MT', 30)
('TW', 32)
('RT', 78)

-------------------------------------------
Time: 2017-06-18 16:42:40
----------------------------

## Stop streaming execution

In [33]:
ssc.stop(False)

-------------------------------------------
Time: 2017-06-17 04:42:15
-------------------------------------------
('#motogp', 39757)
('#qatar', 7013)
('#moto3', 5263)
('#losail', 2787)
('#motogpenabierto', 2631)

