In [3]:
# !pip install pyspark

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import time
import pandas as pd
import random
from collections import Counter

import warnings
warnings.filterwarnings("ignore")

spark = SparkSession.builder.getOrCreate()
path_to_database = "total_long.csv"
path_to_geo_grid = "grid_out.csv"

# Create a Spark dataframe from the database

Using a predefined schema, load data from a CSV file into a Spark dataframe.

In [4]:
schema = StructType([
    StructField("", IntegerType(), False),
    StructField("tweet_id", StringType(), False),
    StructField("created_datetime", DateType(), True),
    StructField("content", StringType(), True),
    StructField("author_id", StringType(), True),
    StructField("place_id", StringType(), True),
    StructField("location", StringType(), True),
    StructField("longitude", FloatType(), True),
    StructField("latitude", FloatType(), True)
])

df = spark.read.option("header", "true")\
               .option("multiLine", "true")\
               .option("inferSchema", "true")\
               .option("escapeQuotes", "true")\
               .option("escape", "\"")\
               .schema(schema)\
               .csv(path_to_database)

df = df.drop(col(""))
df.summary().show()

+-------+--------------------+--------------------+--------------------+-----------------+-------------+-------------------+------------------+
|summary|            tweet_id|             content|           author_id|         place_id|     location|          longitude|          latitude|
+-------+--------------------+--------------------+--------------------+-----------------+-------------+-------------------+------------------+
|  count|              316749|              316749|              316749|           308781|       308672|             316749|            316749|
|   mean|7.449569998604439E17|                null|7.725393844630732...|         Infinity|         null|-118.91889219910098| 35.22396342924302|
| stddev|4.066138605703703E17|                null|2.722915775825696E17|              NaN|         null|  1.967368201338569|1.9001820040807935|
|    min|  100001445859958785|! ! !Happy 4th of...|           100000075| 000385f04a7b87b4|    1700 Naud|          -124.4095|         32.

In [5]:
df.count(), df.printSchema()

root
 |-- tweet_id: string (nullable = true)
 |-- created_datetime: date (nullable = true)
 |-- content: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- place_id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)



(316749, None)

# Create a Spark DataFrame for the grid data

Retrieve 5 columns from the geo-grid CSV file. Create a Spark dataframe from a list of lists.

In [6]:
grid_df = pd.read_csv(path_to_geo_grid)
grid = [[line[2]]+line[5:9] for line in grid_df.values.tolist()]
grid_columns = ['county', 'xmin', 'xmax', 'ymin', 'ymax']

grid_df = spark.sparkContext.parallelize(grid).toDF(grid_columns)
grid_df.summary().show()

+-------+-------+-------------------+-------------------+------------------+------------------+
|summary| county|               xmin|               xmax|              ymin|              ymax|
+-------+-------+-------------------+-------------------+------------------+------------------+
|  count|   4090|               4090|               4090|              4090|              4090|
|   mean|   null|-119.60838001171895|-119.49324354572634| 37.10932030642118| 37.20110690253981|
| stddev|   null|  2.503688232413515| 2.5012656519059333|2.5589218594621417|2.5584634655327196|
|    min|ALAMEDA|   -124.40959633837|  -124.293052871327|  32.5655303544732|  32.6456642111328|
|    25%|   null|   -121.57333607153|  -121.454120599266|  34.9615130471188|  35.0556577987553|
|    50%|   null|  -120.043471690165|  -119.930633160645|  36.8203194609369|  36.9126132782289|
|    75%|   null|  -117.518246629714|  -117.405495472034|  39.2680527706111|  39.3593379859515|
|    max|   YUBA|  -114.282891010866|  -

In [7]:
grid_df.count(), grid_df.printSchema()

root
 |-- county: string (nullable = true)
 |-- xmin: double (nullable = true)
 |-- xmax: double (nullable = true)
 |-- ymin: double (nullable = true)
 |-- ymax: double (nullable = true)



(4090, None)

# Perform a left join using Spark SQL on two tables created from dataframes

We utilized Spark Views to create Spark tables from dataframes, performed a left join between the database dataframe and grid dataframe, and saved the resulting merged dataframe as a single CSV file.

In [8]:
df.createOrReplaceTempView("database")
spark.sql("select count(*) from database").show(5)

+--------+
|count(1)|
+--------+
|  316749|
+--------+



In [9]:
grid_df.createOrReplaceTempView("grid")
spark.sql("select count(*) from grid").show(5)

+--------+
|count(1)|
+--------+
|    4090|
+--------+



In [10]:
t1 = time.time()

new = spark.sql("select d.*, g.county \
                 from database d left join grid g \
                 on d.longitude <= g.xmax and d.longitude >= g.xmin \
                 and d.latitude <= g.ymax and d.latitude >= g.ymin")
new.show(5)

t2 = time.time()
print(f"Executing time: {t2-t1:.3f}s")

+-------------------+----------------+--------------------+----------+----------------+---------------+----------+---------+---------------+
|           tweet_id|created_datetime|             content| author_id|        place_id|       location| longitude| latitude|         county|
+-------------------+----------------+--------------------+----------+----------------+---------------+----------+---------+---------------+
| 100001445859958785|      2011-08-07|Bear Grylls just ...|  18003609|fbd6d2f5a4e4a15e|California, USA| -120.6458|35.247868|SAN LUIS OBISPO|
|1000015974010007552|      2018-05-25|@StacyGSG love th...| 596144748|a592bd6ceb1319f7|  San Diego, CA|-117.10973|32.801037|      SAN DIEGO|
|1000016540027142144|      2018-05-25|Kid: what is this...|  31197211|0c2e6999105f8070|    Anaheim, CA|-117.82906| 33.81007|         ORANGE|
|1000020809799254018|      2018-05-25|Next weekend’s Go...|2821288277|ef74afb7ccba74d6|  Placentia, CA|-117.82659|33.900215| SAN BERNARDINO|
|100003335889

In [11]:
t3 = time.time()

new.coalesce(1).write\
               .option("header", "true")\
               .option("multiLine", "true")\
               .option("inferSchema", "true")\
               .option("escapeQuotes", "true")\
               .option("escape", "\"")\
               .csv("merge.csv")

t4 = time.time()
print(f"Executing time: {t4-t3:.3f}s")

Executing time: 17.114s


# Verify data quality, deduplicate and deal with records with multiple county labels

In [12]:
tt = pd.read_csv('merge.csv/part-00000-748ca533-6d45-4b3c-8940-f810727339fe-c000.csv')
tt = tt.astype({'tweet_id':'str', 'author_id':'str', 'place_id':'str'})
tt.created_datetime = pd.to_datetime(tt.created_datetime).dt.date
tt.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 325769 entries, 0 to 325768
Data columns (total 9 columns):
 #   Column            Non-Null Count   Dtype  
---  ------            --------------   -----  
 0   tweet_id          325769 non-null  object 
 1   created_datetime  325769 non-null  object 
 2   content           325769 non-null  object 
 3   author_id         325769 non-null  object 
 4   place_id          325769 non-null  object 
 5   location          317405 non-null  object 
 6   longitude         325769 non-null  float64
 7   latitude          325769 non-null  float64
 8   county            325769 non-null  object 
dtypes: float64(2), object(7)
memory usage: 22.4+ MB


In [13]:
len(set(tt.county)) # should be 58

58

In [14]:
len(tt.tweet_id.unique()) # not equals to 325769 means duplicates exist

316749

#### We have observed the presence of duplicates in the dataset and will focus on deduplication. To identify duplicates, we use the unique identifier "tweet_id". 

#### Upon further analysis, we have found that almost all duplicates occur twice in the dataset.

In [15]:
multiple = [(k, v) for k, v in Counter(tt.tweet_id).items() if v > 1]
len(multiple)

9020

In [16]:
# all duplicates occur twice
multiple[0][1], multiple[-1][1]

(2, 2)

#### We will review all duplicate records and move those with the same county label to the drop box, while others will be placed in the check box for further verification.

#### Some tweets may have been assigned multiple county labels due to their location falling on the boundaries of different counties. To handle this, we will randomly select one label to keep and discard the others to ensure that each tweet is associated with only one county.

In [17]:
def split_drop_check(tt, multiple):
    drop_index = []
    check_index = []
    for k, v in multiple:
        c1, c2 = tt.county.loc[tt.tweet_id==k]
        if c1 == c2:
            drop_index.append(tt.index[tt.tweet_id==k].tolist()[-1])
        else:
            check_index.append(tt.index[tt.tweet_id==k].tolist())
    return drop_index, check_index

In [18]:
drop_index, check_index = split_drop_check(tt, multiple)
len(drop_index), len(check_index)

(7204, 1816)

In [19]:
# verify records with different county labels manually
print(tt.iloc[297])
print(tt.iloc[298])

tweet_id                               1.0018506864415826e+18
created_datetime                                   2018-05-30
content             Fuck yes bear 😍😭💘 https://t.co/avpZVR32hv
author_id                                         895879717.0
place_id                                     6ba08e404aed471f
location                                        Riverside, CA
longitude                                          -117.39284
latitude                                            33.936188
county                                         SAN BERNARDINO
Name: 297, dtype: object
tweet_id                               1.0018506864415826e+18
created_datetime                                   2018-05-30
content             Fuck yes bear 😍😭💘 https://t.co/avpZVR32hv
author_id                                         895879717.0
place_id                                     6ba08e404aed471f
location                                        Riverside, CA
longitude                                    

In [20]:
check_drop_index = [random.choice([i1, i2]) for i1, i2 in check_index]
len(check_drop_index)

1816

In [21]:
df = tt.drop(drop_index + check_drop_index).reset_index()
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 316749 entries, 0 to 316748
Data columns (total 10 columns):
 #   Column            Non-Null Count   Dtype  
---  ------            --------------   -----  
 0   index             316749 non-null  int64  
 1   tweet_id          316749 non-null  object 
 2   created_datetime  316749 non-null  object 
 3   content           316749 non-null  object 
 4   author_id         316749 non-null  object 
 5   place_id          316749 non-null  object 
 6   location          308672 non-null  object 
 7   longitude         316749 non-null  float64
 8   latitude          316749 non-null  float64
 9   county            316749 non-null  object 
dtypes: float64(2), int64(1), object(7)
memory usage: 24.2+ MB


# Save a database with unique records

In [22]:
df.to_csv('merge_unique.csv') 

In [23]:
spark.stop()