In [1]:
import os
import sys
import socket
import re
import numpy as np
import string
import warnings
from timeit import default_timer as timer
from datetime import datetime
from glob import glob

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,desc,row_number,col,year,month,dayofmonth,dayofweek,to_timestamp,size,isnan,when,count,col,count,lit,sum
import pyspark.sql.functions as F
from pyspark.sql.types import MapType, StringType, IntegerType, StructType, StructField, FloatType, ArrayType
from py4j.java_gateway import java_import
from functools import reduce
from pyspark.sql import DataFrame

# Config

In [4]:
country_code = "US"
print('Country:', country_code)

Country: united-states


In [5]:
try:
    spark
except NameError:
    if 'samuel' in socket.gethostname().lower():
        print('Create Local SparkSession')
        spark=SparkSession.builder.config("spark.driver.host", "localhost").appName("extract-timelines").getOrCreate()
    else:
        print('Create Cluster SparkSession')
        spark=SparkSession.builder.appName("extract-timelines").getOrCreate()
        
# IgnoreCorruptFiles
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
    
print('Hostname:', socket.gethostname())
if  'samuel' in socket.gethostname().lower():
    path_to_data='../../data/timelines/'
else:
    path_to_data='/user/spf248/twitter/data/timelines/'

Hostname: Samuels-MacBook-Pro.local


In [None]:
print('IMPORT')
df=spark.read.parquet(os.path.join(path_to_data,'chunks',country_code,'*/*.parquet'))

print('REPARTITION')
df=df.repartition(1000)
        
print("DROP DUPLICATE IDS")
df=df.drop_duplicates(subset=['tweet_id'])

In [None]:
print('# TWEETS:', df.count())

In [None]:
print('# USERS:', df.select("user_id").distinct().count())

In [None]:
def count_not_null(c, nan_as_null=False):
    """Use conversion between boolean and integer
    - False -> 0
    - True ->  1
    """
    pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))
    return sum(pred.cast("integer")).alias(c)

print('COUNT VALUES THAT ARE NON-NULL AND NON-NAN')
print(df.agg(*[count_not_null(c, True) for c in [
'tweet_id',
'text',
'tweet_lang',
'user_id',
'user_location',
'place_id',
'tweet_longitude',
'tweet_latitude',
]]).show())

# Split By Month and Year

In [1]:
def month_year_iter( start_month, start_year, end_month, end_year ):
    ym_start= 12*start_year + start_month - 1
    ym_end= 12*end_year + end_month - 1
    for ym in range( ym_start, ym_end ):
        y, m = divmod( ym, 12 )
        yield y, m+1

In [None]:
print("EXTRACT YEAR AND MONTH")
df=df.withColumn('year',year('created_at').cast("string"))
df=df.withColumn('month',month('created_at').cast("string"))

In [None]:
print('SAVE')
start = timer()

dates=list(month_year_iter(1,2012,1,2020))

for i in range(len(dates)-1):
    
    df.filter((df["year"]==dates[i][0])&(df["month"]==dates[i][1])).drop('year','month').write.mode(
    "overwrite").parquet(os.path.join(path_to_data,'extract',country_code,str(dates[i][0])+'-'+str(dates[i][1])))

end = timer()
print('DONE IN', round(end - start), 'SEC')

In [1]:
print('Computing Time:',round((1579892135118-1579891482128)/(1000*3600),2))

Computing Time: 0.18


N TWEETS: 1,620,393,106
    
N USERS: 1,541,121

COUNT VALUES THAT ARE NON-NULL AND NON-NAN
+----------+----------+----------+----------+-------------+--------+---------------+--------------+
|  tweet_id|      text|tweet_lang|   user_id|user_location|place_id|tweet_longitude|tweet_latitude|
+----------+----------+----------+----------+-------------+--------+---------------+--------------+
|1620393106|1620393105|1620393106|1620393106|   1620393106|59191039|       22189200|      22189200|
+----------+----------+----------+----------+-------------+--------+---------------+--------------+