In [18]:
import os
import sys
import socket
import re
import numpy as np
import string
import warnings
from timeit import default_timer as timer
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,desc,row_number,col,year,month,dayofmonth,dayofweek,to_timestamp,size,isnan,when,count,col,count,lit,sum
import pyspark.sql.functions as F
from pyspark.sql.types import MapType, StringType, IntegerType, StructType, StructField, FloatType, ArrayType
from py4j.java_gateway import java_import
from functools import reduce
from pyspark.sql import DataFrame

# Config

In [3]:
country_name = "united-states"
print('Country:', country_name)

Country: mexico


In [4]:
try:
    spark
except NameError:
    if 'samuel' in socket.gethostname().lower():
        print('Create Local SparkSession')
        spark=SparkSession.builder.config("spark.driver.host", "localhost").appName("extract-timelines").getOrCreate()
    else:
        print('Create Cluster SparkSession')
        spark=SparkSession.builder.appName("extract-timelines").getOrCreate()
        
# IgnoreCorruptFiles
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
    
print('Hostname:', socket.gethostname())
if  'samuel' in socket.gethostname().lower():
    path_to_data='../../data/timelines/'
else:
    path_to_data='/user/spf248/twitter/data/timelines/'

Create Cluster SparkSession
Hostname: c41-04


In [None]:
fs=spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
list_status=fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(os.path.join(path_to_data,country_name)))
paths=[file.getPath().toString() for file in list_status]
paths=[path.replace('hdfs://dumbo','') for path in paths if 'json.bz2' in path]
np.random.seed(0)
paths=np.random.permutation(sorted(paths))
print('# Files:', len(paths))

In [None]:
n_chunks=200
print('# Chunks:', n_chunks)
paths_chunks=np.array_split(paths, n_chunks)

# Import Data

In [None]:
dfs=[]

for i,paths_chunk in enumerate(paths_chunks):
    
    try:
        
        print('IMPORT CHUNK', i)
        start = timer()

        df=spark.read.option(
        "compression","bzip2").option(
        "multiLine","true").option(
        "encoding","UTF-8").json(list(paths_chunk))

        df=df.select(
        'id_str',
        'created_at',
        'full_text',
        'lang',
        'user.id_str',
        'user.location',
        'coordinates.coordinates',
        'place.id',
        )

        df = df.toDF(*[
        'tweet_id',
        'created_at',
        'text',
        'tweet_lang',
        'user_id',
        'user_location',
        'tweet_coordinates',
        'place_id',
        ])

        df = df.withColumn('created_at', to_timestamp('created_at',"EEE MMM dd HH:mm:ss ZZZZZ yyyy"))
        df = df.withColumn('tweet_longitude', F.col('tweet_coordinates').getItem(0))
        df = df.withColumn('tweet_latitude',  F.col('tweet_coordinates').getItem(1))
        df = df.drop('tweet_coordinates')

        print('# TWEETS:', df.count())
        
        dfs.append(df)
        
        end = timer()
        print('TIME:', round(end - start), 'SEC')
        
    except:
        
        print('ERROR WITH CHUNK', i)
        
    print()

# Finalize Dataset

In [None]:
print('MERGE DFS')
df=reduce(DataFrame.unionByName, dfs)

print('CACHE')
df.cache()

print('REPARTITION')
df=df.repartition(1000)
        
print("DROP DUPLICATE IDS")
df=df.drop_duplicates(subset=['tweet_id'])

In [6]:
print('# TWEETS:', df.count())

# TWEETS: 2252645


In [7]:
print('# USERS:', df.select("user_id").distinct().count())

# USERS: 307451


In [20]:
def count_not_null(c, nan_as_null=False):
    """Use conversion between boolean and integer
    - False -> 0
    - True ->  1
    """
    pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))
    return sum(pred.cast("integer")).alias(c)

print('COUNT VALUES THAT ARE NON-NULL AND NON-NAN')
print(df.agg(*[count_not_null(c, True) for c in [
'tweet_id',
'text',
'tweet_lang',
'user_id',
'user_location',
'place_id',
'tweet_longitude',
'tweet_latitude',
]]).show())

+--------+-------+----------+-------+-------------+--------+---------------+--------------+
|tweet_id|   text|tweet_lang|user_id|user_location|place_id|tweet_longitude|tweet_latitude|
+--------+-------+----------+-------+-------------+--------+---------------+--------------+
| 2252645|2252645|   2252645|2252645|      2252645|   63670|          28596|         28596|
+--------+-------+----------+-------+-------------+--------+---------------+--------------+

None


In [None]:
print('SAVE')
start = timer()

df.write.mode("overwrite").parquet(os.path.join(path_to_data,'extract',country_name))

end = timer()
print('DONE IN', round(end - start), 'SEC')