In [1]:
# Authorize drive access
from google.colab import drive
drive.mount('/content/drive') #, force_remount=True

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Import and init

In [2]:
!pip install pyshp



In [3]:
!pip install pyspark



In [4]:
from pathlib import Path
import os

import zipfile
import io

import shapefile
import shapely
import numpy as np
from shapely.geometry import Point
from shapely.geometry.polygon import Polygon
import pandas as pd

In [5]:
# data_path = Path("H:/Shared drives/taxi_trends/Data") # local
data_path = Path("/content/drive/Shareddrives/taxi_trends/Data") # in colab

In [None]:
from shapely.geometry import Point
from shapely.geometry.polygon import Polygon

# Polygon data is from a shapefile
poly = Polygon(points)
tocheck = Point(point)
if poly.contains(tocheck):
  return True

## Data Extraction

In [6]:
# def extractData(data_path, date_folder):
#     '''Function to extract data from twitter tar files
    
#     Args
#       data_path   - path where the tar files are stored, in the format of one tar file for each day
#       date_folder - specific folder to extract data
    
#     Returns
#       dataframe with twitter data extracted from all the files

#     Raises
#       IOError when tar extraction fails
#       Others when JSON load fails
#     '''
#     import tempfile, tarfile, json, os
#     writedir = tempfile.TemporaryDirectory()
#     writedir.name

#     twitter_data = []

#     # Loop through hourly tar files in each date folder and write the extracted output to a temp file
#     for fil in os.listdir(data_path + date_folder):
#         if fil.endswith('.gz'):
#           try:
#             tar = tarfile.open(data_path + date_folder + '/' + fil, 'r:gz')
#             tar.extractall(writedir.name)
#             tar.close()
#           except IOError:
#             print('Tar file not valid')
#             raise IOError

#     # Read the temp file and concat the output
#     temp_path = writedir.name + '/data/' + date_folder
#     for file in os.listdir(temp_path):
#         with open(temp_path+'/'+file, "r", encoding="utf8") as f:
#           try:
#             [twitter_data.append(json.loads(line)) for line in f.readlines()]
#           except someError:
#             print('JSON load failed: ')
#             raise someError

#     return twitter_data

In [7]:
# Initial extraction of data from json files, intermediate file has been saved

# from tqdm.auto import tqdm

# twitter_df = pd.DataFrame()
# twitter_path = Path(data_path / "twitter data").as_posix()
# folders = os.listdir(twitter_path)


# # For all folders in the data path, extract tweets from all the tar-json files and add to the dataframe
# for folder in tqdm(folders):
#     # print("Extracting from " + folder)
#     temp = pd.DataFrame(extractData(twitter_path, "/"+folder))
#     twitter_df = pd.concat((twitter_df, temp), axis=0)

# del temp
# twitter_df.to_csv(data_path / "twitter data" / "twitter_3mon_extracted.csv")

## Shape data

In [8]:
# Reading taxi zones data
zipshape = zipfile.ZipFile(open(data_path / "Taxi Data/taxizone_shapes"/ "NYC Taxi Zones.zip", 'rb'))
_=[print(filename) for filename in zipshape.namelist()]

geo_export_b8724ed5-0c88-4332-8cf5-b391eb2dcd40.dbf
geo_export_b8724ed5-0c88-4332-8cf5-b391eb2dcd40.shp
geo_export_b8724ed5-0c88-4332-8cf5-b391eb2dcd40.shx
geo_export_b8724ed5-0c88-4332-8cf5-b391eb2dcd40.prj


In [9]:
# Read shape and record data from shape files
dbfname, shpname, shxname, _ = zipshape.namelist()
r = shapefile.Reader(shp=io.BytesIO(zipshape.read(shpname)),
                     shx=io.BytesIO(zipshape.read(shxname)),
                     dbf=io.BytesIO(zipshape.read(dbfname)))

In [10]:
# Extract polygons from shape data
poly_points = [shape.points for shape in r.iterShapes()]

In [11]:
r.records()[0]

Record #0: ['EWR', 1.0, 1.0, 0.0007823067885, 0.116357453189, 'Newark Airport']

In [12]:
# Combine other attributes with polygon values
taxizone_shapedf = pd.concat((pd.DataFrame(np.array(r.records())), pd.Series(poly_points, name="polyshape")), axis = 1)
taxizone_shapedf.rename(columns={0:'borough', 5:'taxizone'}, inplace=True)

In [13]:
# Create a mapping of zone and its polygon
global poly_dict
poly_dict = {}
for i in range(len(poly_points)):
    zone_record = r.records()[i]
    zone = zone_record[5]
#     zone = taxizone_shapedf.loc[i, 'taxizone'] + ', ' + taxizone_shapedf.loc[i, 'borough']
#     poly = Polygon(taxizone_shapedf.loc[i, 'polyshape'])
    poly = Polygon(poly_points[i])
    poly_dict[zone] = poly

## Twitter data

In [14]:
select_cols = ['id', 'timestamp_ms', 'coordinates', 'text']

In [15]:
# There seems to be some columns that are string in the timestamp column due to those json records having incorrect structure
twitter_df_pd = pd.read_csv(data_path / "twitter data"/ "twitter_3mon_extracted.csv", sep=',', usecols=select_cols, error_bad_lines=False)
#, dtype={"id":"float", "timestamp_ms": "float", "coordinates": "str", "text": str}

print(f'Shape of DF: {twitter_df_pd.shape}')

# Convert columns to numberic; Coerce to convert strings to NaNs
twitter_df_pd['id'] = pd.to_numeric(twitter_df_pd["id"], errors='coerce').convert_dtypes()
twitter_df_pd['timestamp_ms'] = pd.to_numeric(twitter_df_pd["timestamp_ms"], errors='coerce')

# Skipping records where timestamp data is not correct
twitter_df_pd = twitter_df_pd[twitter_df_pd.id.notnull() & twitter_df_pd.timestamp_ms.notnull()]

twitter_df_pd.dtypes

  interactivity=interactivity, compiler=compiler, result=result)


Shape of DF: (1110940, 4)


id                Int64
timestamp_ms    float64
coordinates      object
text             object
dtype: object

In [16]:
from pyspark.conf import SparkConf

In [17]:
# Creating a spark dataframe from the pandas dataframe
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import udf, col, collect_list, concat_ws
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType

spark = SparkSession.builder.master("local[1]")\
        .appName("twitterprocessing")\
        .getOrCreate()

In [18]:
customSchema = StructType([
  StructField("id", IntegerType(), True),
  StructField("timestamp_ms", FloatType(), True),
  StructField("coordinates", StringType(), True),
  StructField("text", StringType(), True)]
)

twitter_df_sp = spark.createDataFrame(twitter_df_pd)
twitter_df_sp.printSchema()

root
 |-- id: long (nullable = true)
 |-- timestamp_ms: double (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- text: string (nullable = true)



In [19]:
twitter_df_sp.count()

1110935

In [20]:
def todatetime(val):
  from datetime import datetime
  try:
    stamp = datetime.fromtimestamp(val/1000).strftime("%Y%m%d %H%M%S")
    return stamp
  except Exception as e:
    return None

def getZone(coord_dict):
    import json
    point = json.loads(str.replace(coord_dict, '\'', '"'))['coordinates']
    found = False
    tocheck = Point(point)
    foundZone = None
    for zone in poly_dict.keys():
        if poly_dict[zone].contains(tocheck):
            if not found:
                found = True
                foundZone = zone
            else:
                print("Found two zones! - " + zone + ";" + foundZone)
    return foundZone

In [21]:
datetimeConvertor = udf(lambda x: todatetime(x))
dateExtractor = udf(lambda x: x.split(' ')[0] if x is not None else None)
zoneMapper = udf(lambda x: getZone(x))

In [22]:
# Map timestamp to datetime, extract date and map to taxi zones
twitter_df_mod = twitter_df_sp.withColumn("timestamp_dt", datetimeConvertor(col("timestamp_ms"))) \
                              .withColumn("tweet_date", dateExtractor(col("timestamp_dt"))) \
                              .withColumn("taxizone", zoneMapper(col("coordinates"))) \

twitter_df_mod = twitter_df_mod.filter((twitter_df_mod.tweet_date >= '20181001') & (twitter_df_mod.tweet_date <= '20181231'))

In [23]:
twitter_df_mod.count()

1107314

In [28]:
twitter_df_mod.write.option('header', True).csv((data_path / 'twitter data' / 'twitter_3mon_withzonecsv').as_posix())

In [29]:
twitter_df_agg = twitter_df_mod.groupBy('tweet_date', 'taxizone').agg(concat_ws('. ', collect_list(twitter_df_mod.text)))

In [30]:
twitter_df_agg_pd = twitter_df_agg.toPandas()

KeyboardInterrupt: ignored

In [None]:
print(twitter_df_agg_pd.shape)
twitter_df_agg_pd.head

In [None]:
# Adding the taxi zone borough and service zone
taxi_zones = pd.read_csv(data_path / "Taxi Data" / "taxi_zone_lookup.csv")
twitter_df_final = pd.merge(twitter_df_agg_pd, taxi_zones[["Borough", "Zone", "service_zone"]], how = "inner", left_on = "taxizone", right_on = "Zone")

In [None]:
twitter_df_final.to_csv(data_path / "twitter data "/ "twitter_3mon_aggregated.csv")

## Data Prep for Topic model

In [None]:
# standardizing column names, should do it higher up the pipeline
col_names = ['tw_id', 'tw_date', 'tx_taxizone', 'tw_text', 'tx_borough', 'tx_zone', 'tx_service_zone']
twitter_df = pd.read_csv(data_path / "twitter data" / "twitter_processed_zonefiltered.csv", sep = ',', names=col_names, index_col=0, skiprows=1)

In [None]:
twitter_df = twitter_df[pd.isnull(twitter_df['tx_taxizone']) == False]

In [None]:
# Creating a spark dataframe from the pandas dataframe
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import udf, col, collect_list, concat_ws
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType

spark = SparkSession.builder.master("local[1]")\
        .appName("twitterprocessing")\
        .getOrCreate()
        
twitter_df_sp = spark.createDataFrame(twitter_df)
twitter_df_sp.printSchema()

root
 |-- tw_date: long (nullable = true)
 |-- tx_taxizone: string (nullable = true)
 |-- tw_text: string (nullable = true)
 |-- tx_borough: string (nullable = true)
 |-- tx_zone: string (nullable = true)
 |-- tx_service_zone: string (nullable = true)



In [None]:
 import sys
sys.path.insert(0, '/content/drive/Shareddrives/taxi_trends/Experiments/Twitter Data/Topic Modeling/')

In [None]:
print("Cleaning data")
data_lemmatized = topic_model.cleanData(twitter_df)

Cleaning data
Sample tweets: ['Arrived safely in #nyc ✅ dave and trish mastered subway ✅ delicious meal at #redroosterharlem ✅ jazz jerk chicken r… https://t.co/bJa4J2XpUo', '[18:11:09] 68.174.236.158:3373 &gt;&gt; :23 (TCP:SYN)', '#Anothermiracle, YO-YO host the #Saluteher2018 and still has the most affectionate personality; nice kids kickin it… https://t.co/sGbBZv8Lfd', 'The Daily Mantra: Welcome to October via  @diddy #cantstopwontstop #wordstoliveby #newday #modernmonday… https://t.co/CxBToZGJ7M', 'ARE YOU WOKE...??\n.\nBARACK OBAMA ENCOURAGES US TO #TAKEITBACK AND #YESWECAN...\n.\n#VOTE ON NOVEMBER 6TH - PLEASE!!\n.… https://t.co/dGPGCtCfpO']
[['arrived', 'safely', 'in', 'nyc', 'dave', 'and', 'trish', 'mastered', 'subway', 'delicious', 'meal', 'at', 'jazz', 'jerk', 'chicken']]
['arrived', 'safely', 'in', 'nyc', 'dave', 'and', 'trish', 'mastered', 'subway', 'delicious', 'meal', 'at', 'jazz', 'jerk_chicken']
[['arrive', 'safely', 'master', 'meal', 'jazz', 'jerk_chicken']]


In [None]:
import nltk; nltk.download('stopwords')
import gensim
import gensim.corpora as corpora
from gensim.utils import simple_preprocess

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [None]:
# Util functions for cleaning
def sent_to_words(sentence):
  return ()  # deacc=True removes punctuations

In [None]:
# Util functions for cleaning
def sent_to_words(sentences):
    for sentence in sentences:
        yield(gensim.utils.simple_preprocess(str(sentence), deacc=True))  # deacc=True removes punctuations

# Define functions for stopwords, bigrams, trigrams and lemmatization
def remove_stopwords(texts):
    # NLTK Stop words
    from nltk.corpus import stopwords
    stop_words = stopwords.words('english')
    stop_words.extend(['from', 'subject', 're', 'edu', 'use'])

    return [[word for word in simple_preprocess(str(doc)) if word not in stop_words] for doc in texts]

def make_bigrams(texts, bigram_mod):
    return [bigram_mod[doc] for doc in texts]

def make_trigrams(texts, trigram_mod, bigram_mod):
    return [trigram_mod[bigram_mod[doc]] for doc in texts]

def lemmatization(texts, nlp, allowed_postags=['NOUN', 'ADJ', 'VERB', 'ADV']):
    """https://spacy.io/api/annotation"""
    texts_out = []
    for sent in texts:
        doc = nlp(" ".join(sent)) 
        texts_out.append([token.lemma_ for token in doc if token.pos_ in allowed_postags])
    return texts_out

# from pyspark.sql.functions import pandas_udf
# from pyspark.sql.types import ArrayType, StringType
# @pandas_udf(ArrayType(StringType()))
def cleanData(tweets):
    # Enable logging for gensim - optional
    import logging
    logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.ERROR)

    import warnings
    warnings.filterwarnings("ignore",category=DeprecationWarning)
    import regex as re
    import spacy

    link_regex = "(https?:\/\/)?([\da-z\.-]+)\.([a-z\.]{2,6})([\/\w \.-]*)"

    data = tweets.values.tolist()
    print(f"Sample tweets: {data[:5]}")

    data = [re.sub(link_regex, "", sent) for sent in data]


    data_words = list(sent_to_words(data))
    print(data_words[:1])
    
    # Build the bigram and trigram models
    bigram = gensim.models.Phrases(data_words, min_count=5, threshold=100)
    trigram = gensim.models.Phrases(bigram[data_words], threshold=100)
    bigram_mod = gensim.models.phrases.Phraser(bigram)
    trigram_mod = gensim.models.phrases.Phraser(trigram)

    # Removing stop words
    data_words_nostops = remove_stopwords(data_words)

    # Form Bigrams
    data_words_bigrams = make_bigrams(data_words_nostops, bigram_mod)

    # Initialize spacy 'en' model, keeping only tagger component (for efficiency)
    # python3 -m spacy download en
    nlp = spacy.load("en_core_web_sm", disable=['parser', 'ner'])
    # Obsolete
#     nlp = spacy.load('en', disable=['parser', 'ner'])

    # Do lemmatization keeping only noun, adj, vb, adv
    data_lemmatized = lemmatization(data_words_bigrams, nlp, allowed_postags=['NOUN', 'ADJ', 'VERB', 'ADV'])

    print(data_lemmatized[:1])
    
    return pd.Series(data_lemmatized)

In [None]:
# Sorting by date before lemmatization
twitter_df.sort_values(by=['tw_date', 'tx_borough', 'tx_zone', 'tx_taxizone'])
twitter_df['tw_text_lem'] = cleanData(twitter_df.tw_text)

Sample tweets: ['Baychester NY Mon Oct 1st PM Forecast: TONIGHT Mostly Cloudy Lo 65 TUESDAY Chance Of T-Storm Hi 79. Esplanade NY Mon Oct 1st PM Forecast: TONIGHT Mostly Cloudy Lo 65 TUESDAY Chance Of T-Storm Hi 79. Hillside NY Mon Oct 1st PM Forecast: TONIGHT Mostly Cloudy Lo 65 TUESDAY Chance Of T-Storm Hi 79. Your smile makes me wanna eat your pussy.. Spooky Hours.. Baychester NY Mon Oct 1st AM Forecast: TODAY Mostly Cloudy Hi 79 TONIGHT Mostly Cloudy Lo 65. Esplanade NY Mon Oct 1st AM Forecast: TODAY Mostly Cloudy Hi 79 TONIGHT Mostly Cloudy Lo 65. Hillside NY Mon Oct 1st AM Forecast: TODAY Mostly Cloudy Hi 79 TONIGHT Mostly Cloudy Lo 65. A lil rough around the edges but I keep it smooth.. EACH &amp; EVERY MONDAY\n"VIBES MONDAY"\nAT JAMEX SPORT\'S BAR\nCOME OUT EARLY &amp; STAY LATE !!\n#reggae #dancehall #hiphop… https://t.co/WOIxrRXhhf. 🌐⏯⚠️🎼 #CLEARMiND and I have #acomplished a new #goal .... #Rappers #singers &amp; #artist #producers Stay on ya 1️⃣,… https://t.co/K5tNrrYdsF', '



['baychester', 'ny', 'mon_oct', 'st', 'pm_forecast_tonight', 'mostly_cloudy_lo', 'tuesday', 'chance', 'of', 'storm_hi', 'esplanade', 'ny', 'mon_oct', 'st', 'pm_forecast_tonight', 'mostly_cloudy_lo', 'tuesday', 'chance', 'of', 'storm_hi', 'hillside', 'ny', 'mon_oct', 'st', 'pm_forecast_tonight', 'mostly_cloudy_lo', 'tuesday', 'chance', 'of', 'storm_hi', 'your', 'smile', 'makes', 'me', 'wanna', 'eat', 'your', 'pussy', 'spooky', 'hours', 'baychester', 'ny', 'mon_oct', 'st', 'am_forecast_today_mostly', 'cloudy_hi', 'tonight_mostly_cloudy', 'lo_esplanade', 'ny', 'mon_oct', 'st', 'am_forecast_today_mostly', 'cloudy_hi', 'tonight_mostly_cloudy', 'lo', 'hillside', 'ny', 'mon_oct', 'st', 'am_forecast_today_mostly', 'cloudy_hi', 'tonight_mostly_cloudy', 'lo', 'lil', 'rough', 'around', 'the', 'edges', 'but', 'keep', 'it', 'smooth', 'each', 'amp', 'every', 'monday', 'vibes', 'monday', 'at', 'jamex_sport_bar', 'come', 'out', 'early', 'amp', 'stay', 'late', 'reggae_dancehall', 'hiphop', 'clearmind',

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  This is separate from the ipykernel package so we can avoid doing imports until


In [None]:
twitter_df = twitter_df[pd.isnull(twitter_df['tw_text_lem']) == False]

In [None]:
twitter_df.to_pickle(data_path / 'twitter data' / 'data_lemmatized_3mon.pkl')