### Load Data

#### Connect to Spark & Elasticsearch, gather raw data

In [1]:
import sys
!{sys.executable} -m pip install tzwhere pytz findspark plotly



In [2]:
import findspark
import os

from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext

os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars jars/elasticsearch-spark-20_2.11-6.5.1.jar pyspark-shell'

findspark.init()

sc = SparkContext(appName="esAnalytics")
sqlContext = SQLContext(sc)

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("meetup") \
    .config("spark.some.config.option", "some-value") \
    .config("spark.sql.crossJoin.enabled", "true") \
    .getOrCreate()

In [3]:
from json import loads, dumps

es_read_conf = {
"es.nodes" : 'elastic',
"es.port" : '9200',
"es.resource" : 'meetup-rawdata-*/default'
}

raw_data = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)

raw_data = raw_data.map(lambda v: loads(dumps(v[1])))

df = sqlContext.createDataFrame(raw_data)



#### Select only last response for each rsvp_id

In [4]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

rsvpWindowSpec = Window.partitionBy(df["rsvp_id"]).orderBy(df["mtime"].desc())

df = df \
    .withColumn("rowId", row_number().over(rsvpWindowSpec)) \
    .where("rowId = 1") \
    .orderBy("rsvp_id")

#### Establish timezone, day_of_week_local, hour_local, minute_local of event.event_time based on venue.venue_geo

In [None]:
import pytz

from datetime import datetime
from tzwhere import tzwhere

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType

@udf(StringType())
def udf_timezone_by_geo(lat, lon):
    t = tzwhere.tzwhere()
    
    return t.tzNameAt(float(lat), float(lon))

@udf(ArrayType(StringType()))
def udf_localize_with_timezone(utc_time, timezone_str):
    
    epoch_utc = int(utc_time)
    
    timezone_str = timezone_str.strip()
    
    # check if provided in ms or s:
    if len(str(epoch_utc)) == 13:
        epoch_utc = epoch_utc / 1000

    # get time in UTC
    utc_dt = datetime.utcfromtimestamp(epoch_utc)

    # convert it to tz
    tz = pytz.timezone(timezone_str)
    dt = utc_dt.astimezone(tz)

    offset = dt.utcoffset().total_seconds()

    local_dt = datetime.utcfromtimestamp(epoch_utc + offset)
    
    parts = dict(year_local=local_dt.year,
                 month_local=local_dt.month, 
                 day_local=local_dt.day, 
                 weekday_local=local_dt.isoweekday(),
                 hour_local=local_dt.hour, 
                 minute_local=local_dt.minute)
    
    return list(parts.values())

In [None]:
# to optimize matching event.time with venue.lat/venue.lon create dict with distinct venues

from pyspark.sql.functions import concat, lit

venueGeoDict = df \
    .select(col("venue.lat"), col("venue.lon")) \
    .distinct() \
    .withColumn("key", concat(col("lat"), lit("_"), col("lon"))) \
    .withColumn("event_timezone", udf_timezone_by_geo(col("lat"), col("lon"))) \
    .select(col("key"), col("event_timezone"))

venueGeoDict.show(5)

+-----------------+--------------+
|              key|event_timezone|
+-----------------+--------------+
|52.22977_21.01178| Europe/Warsaw|
+-----------------+--------------+



In [None]:
dfWithEventTimezone = df \
    .join(venueGeoDict, concat(col("venue.lat"), lit("_"), col("venue.lon")) == venueGeoDict.key, 'cross') 

dfWithLocalizedEventTime = dfWithEventTimezone \
    .withColumn("event_time_localized", udf_localize_with_timezone(dfWithEventTimezone.event.time, dfWithEventTimezone.event_timezone))

dfWithLocalizedEventTime.cache()

DataFrame[event: map<string,string>, group: map<string,array<map<string,string>>>, guests: double, member: map<string,double>, mtime: bigint, response: string, rsvp_id: double, venue: map<string,string>, visibility: string, rowId: int, key: string, event_timezone: string, event_time_localized: array<string>]

In [None]:
dfWithLocalizedEventTime \
    .select(col("event.time"), col("event_time_localized"), col("event_timezone")) \
    .show(5)

+-------------+--------------------+--------------+
|         time|event_time_localized|event_timezone|
+-------------+--------------------+--------------+
|1544217874000|[2018, 12, 7, 5, ...| Europe/Warsaw|
|1544239170000|[2018, 12, 8, 6, ...| Europe/Warsaw|
|1544257659000|[2018, 12, 8, 6, ...| Europe/Warsaw|
|1544242550000|[2018, 12, 8, 6, ...| Europe/Warsaw|
|1544235224000|[2018, 12, 8, 6, ...| Europe/Warsaw|
+-------------+--------------------+--------------+
only showing top 5 rows



### Analyze

#### Calculate & visualize most distinguishable distributions of meetings in particular day_of_week_local by tag (Jensen–Shannon divergence)

In [None]:
# total distribution

from pyspark.sql.functions import lit, count, udf, collect_list
from pyspark.sql.types import StringType, DoubleType, MapType

countByAllWindowSpec = Window.partitionBy(lit(1))

totalWeekdayDistribution = dfWithLocalizedEventTime \
    .withColumn("event_isoweekday", dfWithLocalizedEventTime.event_time_localized[3]) \
    .select(col("rsvp_id"), col("event_isoweekday"), count(col("rsvp_id")).over(countByAllWindowSpec).alias("weekday_total_share")) \
    .groupBy(col("event_isoweekday"), col("weekday_total_share")) \
    .count() \
    .orderBy("event_isoweekday") \
    .withColumn("total_dist", col("count")/col("weekday_total_share")) \
    .groupBy() \
    .agg(collect_list(col("total_dist")).alias("total_dist"))

totalWeekdayDistribution.cache()

DataFrame[total_dist: array<double>]

In [None]:
totalWeekdayDistribution.collect()

[Row(total_dist=[0.11764705882352941, 0.09411764705882353, 0.1264705882352941, 0.11666666666666667, 0.10490196078431373, 0.21176470588235294, 0.2284313725490196])]

In [None]:
# distribution by group topic
from pyspark.sql.functions import explode, lower, coalesce, abs
from pyspark.sql.types import Row

countByTopicWindowSpec = Window.partitionBy("group_topic")

topicWeekdayDistributionTmp = dfWithLocalizedEventTime \
    .withColumn("event_isoweekday", dfWithLocalizedEventTime.event_time_localized[3]) \
    .select(col("rsvp_id"), col("event_isoweekday"), explode(col("group.group_topics")).alias("group_topic_map")) \
    .withColumn("group_topic", col("group_topic_map").getItem("urlkey")) \
    .withColumn("weekday_topic_share", count("rsvp_id").over(countByTopicWindowSpec)) \
    .drop("group_topic_map") \
    .groupBy(col("event_isoweekday"), col("group_topic"), col("weekday_topic_share")) \
    .count() \
    .orderBy(col("group_topic"), col("event_isoweekday"))

topics = topicWeekdayDistributionTmp.select(col("group_topic").alias("group_topic_tmp")).distinct()
weekdays = sc.parallelize(list(range(7))).map(lambda x: Row(event_isoweekday_tmp=str(1 + int(x)))).toDF()
cross = weekdays.crossJoin(topics).withColumn("count_tmp", lit(0))

# ensure that every topic has entry for every weekday (even if no meetings took place on that weekday)
topicWeekdayDistribution = cross \
    .join(topicWeekdayDistributionTmp, (topicWeekdayDistributionTmp.event_isoweekday == cross.event_isoweekday_tmp) & (topicWeekdayDistributionTmp.group_topic == cross.group_topic_tmp), how='outer') \
    .withColumn("event_isoweekday", col("event_isoweekday_tmp")) \
    .withColumn("group_topic", col("group_topic_tmp")) \
    .withColumn("count", coalesce("count", "count_tmp")) \
    .withColumn("weekday_topic_share", coalesce("weekday_topic_share", lit(-1))) \
    .drop("event_isoweekday_tmp", "group_topic_tmp", "count_tmp") \
    .orderBy("group_topic", "event_isoweekday") \
    .withColumn("topic_dist", abs(col("count")/col("weekday_topic_share"))) \
    .groupBy("group_topic") \
    .agg(collect_list(col("topic_dist")).alias("topic_dist"))

topicWeekdayDistribution.cache()

DataFrame[group_topic: string, topic_dist: array<double>]

In [None]:
# JS Divergence UDF

from numpy import asarray, e
from scipy import stats

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

@udf(DoubleType())
def udf_jsd(p, q, base=e):
    '''
        Implementation of pairwise `jsd` based on  
        https://en.wikipedia.org/wiki/Jensen%E2%80%93Shannon_divergence
    '''
    try:
        ## convert to np.array
        p, q = asarray(p), asarray(q)
        ## normalize p, q to probabilities
        p, q = p/p.sum(), q/q.sum()

        m = 1./2*(p + q)

        return float(stats.entropy(p,m, base=base)/2. +  stats.entropy(q, m, base=base)/2.)
    except:
        return -1

In [None]:
# calculate Jensen-Shannon Divergence per topic & select 10 highest
jsDivergence = topicWeekdayDistribution \
    .crossJoin(totalWeekdayDistribution) \
    .withColumn("jsd", udf_jsd(col("topic_dist"), col("total_dist"))) \
    .sort(col("jsd").desc()) \
    .limit(10)

jsDivergence.cache()

DataFrame[group_topic: string, topic_dist: array<double>, total_dist: array<double>, jsd: double]

In [None]:
def plot_hist(data_list):
    import plotly.graph_objs as go
    from plotly.offline import init_notebook_mode, iplot
    from math import ceil
    from plotly import tools
    
    init_notebook_mode(connected=True)
    
    from json import loads
    
    no = len(data_list)
    
    cols = 3
    rows = ceil(no/cols)
    
    fig = tools.make_subplots(rows=rows, cols=cols, subplot_titles=[loads(x).get('group_topic', '') for x in data_list])

    fig['layout'].update(height=1600, width=900, title='Most characteristic weekday dist', showlegend=False)

    i = 0
    
    rows = [x+1 for x in range(rows)]
    cols = [x+1 for x in range(cols)]
    
    combos = [(i,j) for i in rows for j in cols]
    
    print(combos)
    
    for data in data_list:
        combo = combos[i]
        
        cur_row = combo[0]
        cur_col = combo[1]
        
        data = loads(data)

        x = [x+1 for x in range(6)]
        y = data.get('topic_dist', [0 for x in range(6)])

        title = data.get('group_topic', 'na')

        fig.append_trace(go.Bar(x=x,y=y), cur_row, cur_col)
        
        i += 1
        
    iplot(fig, filename='make-subplots-multiple-with-titles')
    
entries = jsDivergence.toJSON().take(10)

plot_hist(entries)

#### Calculate 'New Years Resolutions Effect' to establish which tags gained most interest inbetween december/january

In [None]:
# all available topics (if occured in one month and not other)
t = df \
    .select(explode(col("group.group_topics")).alias("topic")) \
    .select(col("topic").getItem("urlkey").alias("topic")) \
    .distinct()

# select topics from january, 2019
m1 = dfWithLocalizedEventTime \
    .where((col("event_time_localized")[0] == '2019') & (col("event_time_localized")[1] == '1')) \
    .select(explode("group.group_topics").alias("topic")) \
    .withColumn("topic_m1", col("topic").getItem("urlkey")) \
    .groupBy("topic_m1") \
    .count() \
    .alias("m1")

# select topics from december, 2018
m2 = dfWithLocalizedEventTime \
    .where((col("event_time_localized")[0] == '2018') & (col("event_time_localized")[1] == '12')) \
    .select(explode("group.group_topics").alias("topic")) \
    .withColumn("topic_m2", col("topic").getItem("urlkey")) \
    .groupBy("topic_m2") \
    .count() \
    .alias("m2")

# calculate increase in interest per topic & select 10 highest
increase = t \
    .join(m2, t.topic == m2.topic_m2, how='full') \
    .join(m1, t.topic == m1.topic_m1, how='full') \
    .withColumn("m1", coalesce(col("m1.count"), lit("0"))) \
    .withColumn("m2", coalesce(col("m2.count"), lit("0"))) \
    .select(col("topic"), col("m1"), col("m2")) \
    .withColumn("change", ((col("m1")+col("m2"))/2)*(col("m1")/col("m2"))) \
    .sort(col("change").desc()) \
    .limit(20)

increase.show()

# show topics that were absent in december, 2018 but appeared in january, 2019
new_topics = increase \
    .where((col("m1") > 0) & (col("m2") == 0))

new_topics.show()