In [1]:
import pyspark_cassandra

In [2]:
from pyspark.sql.types import *

In [3]:
event = StructType([
    StructField('event_id', StringType(), True),
    StructField('event_name', StringType(), True),
    StructField('event_url', StringType(), True),
    StructField('time', LongType(), True)
])
group_topics = ArrayType(StructType([
    StructField('urlkey', StringType(), True),
    StructField('topic_name', StringType(), True)
]))
group = StructType([
    StructField('group_city', StringType(), True),
    StructField('group_country', StringType(), True),
    StructField('group_id', IntegerType(), True),
    StructField('group_name', StringType(), True),
    StructField('group_lon', FloatType(), True),
    StructField('group_lat', FloatType(), True),
    StructField('group_urlname', StringType(), True),
    StructField('group_state', StringType(), True),
    StructField('group_topics', group_topics, True)
])
venue = StructType([
    StructField('venue_name', StringType(), True),
    StructField('lon', FloatType(), True),
    StructField('lat', FloatType(), True),
    StructField('venue_id', IntegerType(), True),
])
member = StructType([
    StructField('member_id', IntegerType(), True),
    StructField('photo', StringType(), True),
    StructField('member_name', StringType(), True),
])
schema = StructType([
    StructField('event', event, True),
    StructField('group', group, True),
    StructField('venue', venue, True),
    StructField('visibility', StringType(), True),
    StructField('response', StringType(), True),
    StructField('guests', IntegerType(), True),
    StructField('member', member, True),
    StructField('rsvp_id', LongType(), True),
    StructField('mtime', LongType(), True)
])

In [4]:
df = spark.read.json('file:///home/bondk/Data/2018-03-29_061608.json', schema)

In [5]:
df.printSchema()

root
 |-- event: struct (nullable = true)
 |    |-- event_id: string (nullable = true)
 |    |-- event_name: string (nullable = true)
 |    |-- event_url: string (nullable = true)
 |    |-- time: long (nullable = true)
 |-- group: struct (nullable = true)
 |    |-- group_city: string (nullable = true)
 |    |-- group_country: string (nullable = true)
 |    |-- group_id: integer (nullable = true)
 |    |-- group_name: string (nullable = true)
 |    |-- group_lon: float (nullable = true)
 |    |-- group_lat: float (nullable = true)
 |    |-- group_urlname: string (nullable = true)
 |    |-- group_state: string (nullable = true)
 |    |-- group_topics: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- urlkey: string (nullable = true)
 |    |    |    |-- topic_name: string (nullable = true)
 |-- venue: struct (nullable = true)
 |    |-- venue_name: string (nullable = true)
 |    |-- lon: float (nullable = true)
 |    |-- lat: float (nullable =

In [6]:
from datetime import datetime
import pytz

```sql
CREATE TABLE IF NOT EXISTS meetup_analysis.world_stat_by_date (
    mdate date,
    group_country text,
    group_city text,
    event_name text,
    event_id text,
    event_time timestamp,
    venue_name text,
    count int,
    event_url text,
    lon float,
    lat float,
    topics set<text>,
PRIMARY KEY ((mdate), count, group_country, event_id));
```

In [7]:
selected_country_df = df.select('mtime', 'group.group_country', 'group.group_city', 
                             'event.event_name', 'event.event_id', 'event.time',
                             'venue.venue_name', 'event.event_url', 'venue.lon', 'venue.lat', 
                             'group.group_topics') \
    .where((df.visibility == 'public') & (df.response == 'yes'))
selected_country_df.printSchema()

root
 |-- mtime: long (nullable = true)
 |-- group_country: string (nullable = true)
 |-- group_city: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- time: long (nullable = true)
 |-- venue_name: string (nullable = true)
 |-- event_url: string (nullable = true)
 |-- lon: float (nullable = true)
 |-- lat: float (nullable = true)
 |-- group_topics: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- urlkey: string (nullable = true)
 |    |    |-- topic_name: string (nullable = true)



In [8]:
from pyspark.sql.functions import *
import pytz

In [9]:
def parse_date(unix_time):
    if unix_time:
        return datetime.fromtimestamp(unix_time/1000, tz=pytz.utc).date()

def parse_time(unix_time):
    if unix_time:
        return datetime.fromtimestamp(unix_time/1000, tz=pytz.utc)
    
def merge_topics(arr):
    res = []
    for topic in arr:
        res.append(topic.asDict()['topic_name'])
    return res

parse_date_udf = udf(parse_date, DateType())
parse_time_udf = udf(parse_time, TimestampType())
merge_topics_udf = udf(merge_topics, ArrayType(StringType()))

In [10]:
country_df = selected_country_df \
    .withColumn('time', parse_time_udf(selected_country_df.time)) \
    .withColumnRenamed('time', 'event_time') \
    .withColumn('mtime', parse_date_udf(selected_country_df.mtime)) \
    .withColumnRenamed('mtime', 'mdate') \
    .filter("mdate = cast('2018-03-29' as DATE)") \
    .withColumn('group_topics', merge_topics_udf(selected_country_df.group_topics)) \
    .withColumnRenamed('group_topics', 'topics')
    
country_df.show()

+----------+-------------+----------+--------------------+------------+-------------------+--------------------+--------------------+----------+---------+--------------------+
|     mdate|group_country|group_city|          event_name|    event_id|         event_time|          venue_name|           event_url|       lon|      lat|              topics|
+----------+-------------+----------+--------------------+------------+-------------------+--------------------+--------------------+----------+---------+--------------------+
|2018-03-29|           us|  New York|NOTE - @IBM Atriu...|   249212630|2018-04-07 11:00:00|          IBM ATRIUM|https://www.meetu...|    -73.97|    40.76|[Intellectual Dis...|
|2018-03-29|           us| Las Vegas|Free Business Net...|   244917823|2018-04-05 18:30:00|Community Room at...|https://www.meetu...|-115.24644| 36.06594|[Network Marketin...|
|2018-03-29|           us|    Irvine|Deerfield Park Sa...|fhfvkpyxfbmc|2018-03-29 16:00:00|Deerfield Communi...|https://

In [11]:
country_df.printSchema()

root
 |-- mdate: date (nullable = true)
 |-- group_country: string (nullable = true)
 |-- group_city: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- venue_name: string (nullable = true)
 |-- event_url: string (nullable = true)
 |-- lon: float (nullable = true)
 |-- lat: float (nullable = true)
 |-- topics: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [12]:
country_total = country_df.groupBy('group_country').count()

country_total.show()

+-------------+-----+
|group_country|count|
+-------------+-----+
|           cr|    3|
|           us|39501|
|           eg|   29|
|           ge|    6|
|           il|  554|
|           cl|  256|
|           ro|  152|
|           ba|    4|
|           jp| 1035|
|           kw|    3|
|           by|    7|
|           lv|   12|
|           vn|   60|
|           pl|  937|
|           cn|  160|
|           za|  366|
|           sk|   13|
|           bd|   41|
|           mu|    2|
|           pt|  272|
+-------------+-----+
only showing top 20 rows



In [13]:
country_count = country_df.groupBy('group_country', 'event_id').count()
country_count.show()

+-------------+------------+-----+
|group_country|    event_id|count|
+-------------+------------+-----+
|           hk|   248347178|    2|
|           au|   247693201|    6|
|           au|   249217975|    3|
|           au|   249212482|    2|
|           us|   248782765|    1|
|           fr|   249219252|    3|
|           us|   248624894|    1|
|           us|   249164453|    1|
|           gb|   249202007|    1|
|           gb|jfbvcpyxjbnc|    2|
|           il|   249148989|   24|
|           in|jfzfkpyxfbmc|    2|
|           in|   248665498|    1|
|           hk|pnpvxlyzfblc|    1|
|           de|pzfmkpyxgbhb|    3|
|           gb|   249092419|    1|
|           pt|   249006060|    1|
|           gb|   248303101|   11|
|           gb|zxkrjpyxfbmc|    1|
|           gb|ftmtdpyxgbfb|    1|
+-------------+------------+-----+
only showing top 20 rows



In [14]:
country_max = country_count.groupBy('group_country') \
    .agg(max('count').alias('max')) \
    .withColumnRenamed('group_country', 'country')
country_max.show()

+-------+---+
|country|max|
+-------+---+
|     cr|  2|
|     us|122|
|     eg|  6|
|     ge|  4|
|     il| 51|
|     cl| 47|
|     ro| 20|
|     ba|  3|
|     jp| 20|
|     kw|  3|
|     by|  7|
|     lv|  5|
|     vn|  4|
|     pl| 71|
|     cn| 18|
|     za| 23|
|     sk|  4|
|     bd| 25|
|     mu|  2|
|     pt| 21|
+-------+---+
only showing top 20 rows



In [15]:
country_most_popular_event = country_count.join(broadcast(country_max), 
                   (country_count.group_country == country_max.country) & 
                   (country_count['count'] == country_max['max']), 'inner') \
    .select('country', 'event_id', 'max')  \
    .withColumnRenamed('event_id', 'id')
country_most_popular_event.show()

+-------+---------+---+
|country|       id|max|
+-------+---------+---+
|     mt|247017657|  2|
|     ae|249221874| 13|
|     cd|248532148|  2|
|     rw|248314612|  1|
|     hk|249223168| 11|
|     md|248571847|  8|
|     li|249155290|  4|
|     ec|248888214|  5|
|     gh|248131180|  2|
|     au|248205529| 28|
|     ua|247766004| 16|
|     de|249219430| 57|
|     np|249124429|  7|
|     bo|249192337| 10|
|     se|249220929| 67|
|     lk|249026348| 17|
|     sk|249224454|  4|
|     gh|248221282|  2|
|     ke|248948227|  6|
|     cr|248106326|  2|
+-------+---------+---+
only showing top 20 rows



In [16]:
country_popular_event_and_total = country_most_popular_event.join(broadcast(country_total), 
                                country_most_popular_event.country == country_total.group_country) \
    .select('country', 'id', 'max', 'count')
    
country_popular_event_and_total.show()

+-------+---------+---+-----+
|country|       id|max|count|
+-------+---------+---+-----+
|     mt|247017657|  2|    2|
|     ae|249221874| 13|  454|
|     cd|248532148|  2|    2|
|     rw|248314612|  1|    1|
|     hk|249223168| 11|  533|
|     md|248571847|  8|   11|
|     li|249155290|  4|    4|
|     ec|248888214|  5|    8|
|     gh|248131180|  2|    9|
|     au|248205529| 28| 2134|
|     ua|247766004| 16|  120|
|     de|249219430| 57| 2901|
|     np|249124429|  7|   11|
|     bo|249192337| 10|   37|
|     se|249220929| 67|  509|
|     lk|249026348| 17|   79|
|     sk|249224454|  4|   13|
|     gh|248221282|  2|    9|
|     ke|248948227|  6|   48|
|     cr|248106326|  2|    3|
+-------+---------+---+-----+
only showing top 20 rows



In [17]:
country_popular_event_detail_and_total = country_df.join(broadcast(country_popular_event_and_total), 
                country_df.event_id == country_popular_event_and_total.id).distinct() \
    .withColumnRenamed('count', 'total')

country_popular_event_detail_and_total.show()

+----------+-------------+-------------+--------------------+------------+-------------------+--------------------+--------------------+----------+---------+--------------------+-------+------------+---+-----+
|     mdate|group_country|   group_city|          event_name|    event_id|         event_time|          venue_name|           event_url|       lon|      lat|              topics|country|          id|max|total|
+----------+-------------+-------------+--------------------+------------+-------------------+--------------------+--------------------+----------+---------+--------------------+-------+------------+---+-----+
|2018-03-29|           it|      Bologna|Hyperledger : Blo...|   249221467|2018-06-13 10:00:00|   Osteria La Frasca|https://www.meetu...| 11.327358| 44.49448|[Decentralized Sy...|     it|   249221467| 15|  470|
|2018-03-29|           il|Tel Aviv-Yafo|Serverless meet E...|   248295491|2018-04-15 07:30:00|           Mindspace|https://www.meetu...|  34.77411|32.064617|[Ja

In [18]:
country_popular_event_detail_and_total.printSchema()

root
 |-- mdate: date (nullable = true)
 |-- group_country: string (nullable = true)
 |-- group_city: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- venue_name: string (nullable = true)
 |-- event_url: string (nullable = true)
 |-- lon: float (nullable = true)
 |-- lat: float (nullable = true)
 |-- topics: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- country: string (nullable = true)
 |-- id: string (nullable = true)
 |-- max: long (nullable = true)
 |-- total: long (nullable = false)



In [20]:
country_popular_event_detail_and_total_rename = country_popular_event_detail_and_total \
    .select('mdate', 'group_country', 'total', 'group_city', 'event_name', 
            'event_id', 'event_time', 'venue_name', 'max', 'event_url', 'lon',
            'lat', 'topics') \
    .withColumnRenamed('max', 'count')
country_popular_event_detail_and_total_rename.printSchema()

root
 |-- mdate: date (nullable = true)
 |-- group_country: string (nullable = true)
 |-- total: long (nullable = false)
 |-- group_city: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- venue_name: string (nullable = true)
 |-- count: long (nullable = true)
 |-- event_url: string (nullable = true)
 |-- lon: float (nullable = true)
 |-- lat: float (nullable = true)
 |-- topics: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [22]:
country_popular_event_detail_and_total_rename.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table="world_stat_by_date", keyspace="meetup_analysis")\
    .save()