In [5]:
# Uncomment to install these libraries
!pip install pandas
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 23.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=3ef1faa0e073fdcf161087c9e25f1a0255e707cb1180351b8afb3c22185351f0
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count, size
from pyspark.sql.types import ArrayType, StringType, IntegerType
import pyspark.sql.functions as F

from pyspark import SparkFiles

from pprint import pprint

In [7]:
spark = SparkSession.builder.appName("Tweet Analysis").getOrCreate()

# Data Ingestion

In [None]:
url = "https://s3.amazonaws.com/peyck.es/BDA_Project/oct18-oct19.csv"
spark.sparkContext.addFile(url)


In [None]:
raw_df = spark.read.csv("file://"+SparkFiles.get("oct18-oct19.csv"), header=True, inferSchema= True)

In [None]:
raw_df.show()

+-----------+---------------+---------------+-------+--------+---------+--------------------+----+--------------------+--------------------+--------+--------------------+
|       date|       username|             to|replies|retweets|favorites|                text| geo|            mentions|            hashtags|      id|           permalink|
+-----------+---------------+---------------+-------+--------+---------+--------------------+----+--------------------+--------------------+--------+--------------------+
|4/1/19 0:00|  SneakerShouts|           null|      0|       2|       24|"STEAL: Over 50% ...|null|                null|                null|1.11E+18|https://twitter.c...|
|4/1/19 0:00|Ebay_Birmingham|           null|      0|       0|        0|Check out #Nike D...|null|               @eBay|#Nike #Nike #ebay...|1.11E+18|https://twitter.c...|
|4/1/19 0:00|    SneakerNews|           null|      2|      15|      102|"The Nike Air Max...|null|                null|                null|1.11E

# Data Cleaning & Data Validation

## Handling None Values

In [None]:
def check_for_nan(df, subset=None):
    if subset:
        df = df.select(subset)
    for column in df.columns:
        df = df.withColumn(column, df["`{}`".format(column)].cast("string"))
    nan_df = df.select(
        [
            count(
                when(
                    col(c).contains("None")
                    | col(c).contains("NULL")
                    | col(c).contains("NaN")
                    | (col(c) == "")
                    | col(c).isNull()
                    | isnan(c),
                    c,
                )
            ).alias(c)
            for c in df.columns
        ]
    )
    return nan_df.toPandas()

In [None]:
nans_in_df = check_for_nan(raw_df)

In [None]:
print(f"Total number of samples: {raw_df.count()}")

Total number of samples: 4430893


In [None]:
nans_in_df.head()

Unnamed: 0,date,username,to,replies,retweets,favorites,text,geo,mentions,hashtags,id,permalink
0,0,1307,2984222,0,0,0,1274,4396600,3378046,3298391,24541,10273


From a quick look, it seems:
1. More than half of the "to" feature values are None.
2. Almost all of the "geo" feature values are None.
3. Most of the mentions and hashtags are None.

From the above data, we can conclude that most users tweeting about Nike don't address the tweet to any single entity. They also don't include any geo location to their tweet. And they don't use mentions or hashtags when tweeting about Nike.

**1. Because most of the values within geo feature are None, so we're going to drop "geo" from the dataframe.**

**2. We cannot drop rows where feature "to", "mentions" & "hashtags" are None because a lot of samples are missing these values. We can replace None with empty string for "to", "mentions", "hashtags" later.**

In [None]:
mostly_nan_features = 'geo'
df = raw_df.drop(mostly_nan_features)

In [None]:
df.show()

+-----------+---------------+---------------+-------+--------+---------+--------------------+--------------------+--------------------+--------+--------------------+
|       date|       username|             to|replies|retweets|favorites|                text|            mentions|            hashtags|      id|           permalink|
+-----------+---------------+---------------+-------+--------+---------+--------------------+--------------------+--------------------+--------+--------------------+
|4/1/19 0:00|  SneakerShouts|           null|      0|       2|       24|"STEAL: Over 50% ...|                null|                null|1.11E+18|https://twitter.c...|
|4/1/19 0:00|Ebay_Birmingham|           null|      0|       0|        0|Check out #Nike D...|               @eBay|#Nike #Nike #ebay...|1.11E+18|https://twitter.c...|
|4/1/19 0:00|    SneakerNews|           null|      2|      15|      102|"The Nike Air Max...|                null|                null|1.11E+18|https://twitter.c...|
|4/1

In [None]:
# Replacing None with empty string
none_to_empty_string_features = ['to', 'mentions', 'hashtags']
df = df.fillna('', subset=none_to_empty_string_features)

## Checking for Data Types

In [None]:
pprint(dict(df.dtypes))

{'date': 'string',
 'favorites': 'int',
 'hashtags': 'string',
 'id': 'string',
 'mentions': 'string',
 'permalink': 'string',
 'replies': 'int',
 'retweets': 'int',
 'text': 'string',
 'to': 'string',
 'username': 'string'}


As you can see, most of the features are either string or int. However, it doesn't make sense for the "date" feature to be string. We need to change "date" feature from string to date type.

We also need to clean "mentions" and "hashtags".

In [None]:
def convert_timestamp(df, timestamp_column, replace_original=True):
    df_with_timestamp = df.withColumn(
        "timestamp", F.to_timestamp(timestamp_column, "M/d/yy H:mm")
    )

    # Making sure timestamp conversion took place accurately without creating any new NaN values
    nan_df = check_for_nan(df_with_timestamp, subset=["timestamp"])
    if nan_df["timestamp"].values[0] > 0:
        raise ValueError(
            f"Conversion of {timestamp_column} to type timestamp generated unexpected"
            f"NaN values."
        )

    if replace_original:
        df_with_timestamp = df_with_timestamp.drop(timestamp_column)
        df_with_timestamp = df_with_timestamp.withColumnRenamed(
            "timestamp", timestamp_column
        )
    return df_with_timestamp

In [None]:
# Convert date to timestamp type
df = convert_timestamp(df, "date")

In [None]:
def remove_empty_strings(list_of_strings):
  return [string for string in list_of_strings if string]
clean_list = F.udf(remove_empty_strings, ArrayType(StringType()))

def break_into_list(df, feature, special_identifier):
  df = df.withColumn(feature, clean_list(F.split(feature, special_identifier)))
  return df

In [None]:
# Converting 'mentions' and 'hashtags' from string to list of string
df = break_into_list(df, 'hashtags', '#')
df = break_into_list(df, 'mentions', '@')

In [None]:
df.show()

+---------------+---------------+-------+--------+---------+--------------------+--------------------+--------------------+--------+--------------------+-------------------+
|       username|             to|replies|retweets|favorites|                text|            mentions|            hashtags|      id|           permalink|               date|
+---------------+---------------+-------+--------+---------+--------------------+--------------------+--------------------+--------+--------------------+-------------------+
|  SneakerShouts|               |      0|       2|       24|"STEAL: Over 50% ...|                  []|                  []|1.11E+18|https://twitter.c...|2019-04-01 00:00:00|
|Ebay_Birmingham|               |      0|       0|        0|Check out #Nike D...|              [eBay]|[Nike , Nike , eb...|1.11E+18|https://twitter.c...|2019-04-01 00:00:00|
|    SneakerNews|               |      2|      15|      102|"The Nike Air Max...|                  []|                  []|1.11E+1

In [None]:
pprint(df.dtypes)

[('username', 'string'),
 ('to', 'string'),
 ('replies', 'int'),
 ('retweets', 'int'),
 ('favorites', 'int'),
 ('text', 'string'),
 ('mentions', 'array<string>'),
 ('hashtags', 'array<string>'),
 ('id', 'string'),
 ('permalink', 'string'),
 ('date', 'timestamp')]


We've succesfully changed the data types to data types more suitable for analysis.

## Dropping Unwanted Features

There are some columns which are purely randomly assigned to a tweet and are of no significance to our analysis. We should drop these columns. These columns are:
1. "id" feature: because ids are randomly assigned.
2. "permanlik" feature: link to tweet doesn't affect the analysis.

In [None]:
unwanted_features = ("id", "permalink")
df = df.drop(*unwanted_features)

# Data Pre-processing

Our hypothesis was **"Given a tweet, we can predict the number of engagements."** Let's prepare the dataset for the same.

## Creating Labels

We're predicting engagements. In our case engagement would be the sum of "replies", "retweets & "favourites" features.

In [None]:
df = df.withColumn("engagement", F.col("replies") + F.col("retweets") + F.col("favorites"))

In [None]:
label_columns = ('replies', 'retweets', 'favorites')
df = df.drop(*label_columns)

In [None]:
df.show()

+---------------+---------------+--------------------+--------------------+--------------------+-------------------+----------+
|       username|             to|                text|            mentions|            hashtags|               date|engagement|
+---------------+---------------+--------------------+--------------------+--------------------+-------------------+----------+
|  SneakerShouts|               |"STEAL: Over 50% ...|                  []|                  []|2019-04-01 00:00:00|        26|
|Ebay_Birmingham|               |Check out #Nike D...|              [eBay]|[Nike , Nike , eb...|2019-04-01 00:00:00|         0|
|    SneakerNews|               |"The Nike Air Max...|                  []|                  []|2019-04-01 00:00:00|       119|
|     titoloshop|               |Nike Air Vapormax...|                  []|                  []|2019-04-01 00:00:00|         0|
|   Taniabanks24|BornLiveDieFree|@MichaelAvenatti ...|   [MichaelAvenatti]|                  []|2019-04-

## Building Features

## Extracting Information From Hashtag

In [None]:
df = df.withColumn('number_hashtags', size(F.col('hashtags')))

# Since hashtags are already part of the main text. We can drop the actual hashtag.
df = df.drop('hashtags')

## Extracting Information From Mentions

In [None]:
df = df.withColumn('number_mentions', size(F.col('mentions')))

# Since hashtags are already part of the main text. We can drop the actual hashtag.
df = df.drop('mentions')

In [None]:
df.show()

+---------------+---------------+--------------------+-------------------+----------+---------------+---------------+
|       username|             to|                text|               date|engagement|number_hashtags|number_mentions|
+---------------+---------------+--------------------+-------------------+----------+---------------+---------------+
|  SneakerShouts|               |"STEAL: Over 50% ...|2019-04-01 00:00:00|        26|              0|              0|
|Ebay_Birmingham|               |Check out #Nike D...|2019-04-01 00:00:00|         0|              5|              1|
|    SneakerNews|               |"The Nike Air Max...|2019-04-01 00:00:00|       119|              0|              0|
|     titoloshop|               |Nike Air Vapormax...|2019-04-01 00:00:00|         0|              0|              0|
|   Taniabanks24|BornLiveDieFree|@MichaelAvenatti ...|2019-04-01 00:00:00|         2|              0|              1|
|         bodega|               |Nike Moon Racer Q...|20

### Converting "username" & "to" feature to one-hot encoded feature

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

def one_hot_encode(df, feature):
  df = df.withColumn(feature, when(F.col(feature) == '', None).otherwise(F.col(feature)))

  # Map strings to numbers with string indexer
  string_indexer = StringIndexer(inputCol=feature, outputCol=f'{feature}_index')
  indexed_df = string_indexer.fit(df).transform(df)

  # Onehot encode indexed values
  encoder = OneHotEncoder(inputCol=f'{feature}_index', outputCol=f'{feature}_vec')
  encoded_df = encoder.fit(indexed_df).transform(indexed_df)

  extra_column = (feature, f"{feature}_index")
  encoded_df = encoded_df.drop(*extra_column)

  return encoded_df

In [None]:
# df = one_hot_encode(df, 'username')

In [None]:
# df = one_hot_encode(df, 'to')

In [None]:
# df.show()

## Processing Twitter Text

In [None]:
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import Tokenizer

# Remove punctuation (REGEX provided) and numbers
wrangled = df.withColumn('text', regexp_replace(df.text, '[_():;,.@#\/!%\+?\\-]', ' '))
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, '[0-9]', ' '))

# Merge multiple spaces
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, ' +', ' '))

# Split the text into words
wrangled = Tokenizer(inputCol='text', outputCol='words').transform(wrangled)

wrangled.show(4, truncate=False)

+---------------+---+----------------------------------------------------------------------------------------------------------------------------------------------+-------------------+----------+---------------+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|username       |to |text                                                                                                                                          |date               |engagement|number_hashtags|number_mentions|words                                                                                                                                                                     |
+---------------+---+----------------------------------------------------------------------------------------------------------------------------------------------+-------------------+----------+-------

In [None]:
from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF

# Remove stop words.
wrangled = StopWordsRemover(inputCol='words', outputCol='terms')\
      .transform(wrangled)

# Apply the hashing trick
wrangled = HashingTF(inputCol='terms', outputCol='hash', numFeatures=1024)\
      .transform(wrangled)

# Convert hashed symbols to TF-IDF
tf_idf = IDF(inputCol='hash', outputCol='features')\
      .fit(wrangled).transform(wrangled)
      
tf_idf.select('terms', 'features').show(4, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|terms                                                                                                                                                                |features                                                                                                                                

In [None]:
tf_idf.show(4, truncate=False)

+---------------+---+----------------------------------------------------------------------------------------------------------------------------------------------+-------------------+----------+---------------+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Convert text to embedding
# Convert hashtags to one-hot encoding
# Convert username to one-hot encoding
# Convert "to" to one-hot encoding
# Convert date

# Model Training

In [None]:
trainable_tf_id = tf_idf.select('username', 'number_hashtags', 'number_mentions', 'features', 'engagement')

In [None]:
trainable_tf_id.show()

+---------------+---------------+---------------+--------------------+----------+
|       username|number_hashtags|number_mentions|            features|engagement|
+---------------+---------------+---------------+--------------------+----------+
|  SneakerShouts|              0|              0|(1024,[120,193,22...|        26|
|Ebay_Birmingham|              5|              1|(1024,[5,33,102,1...|         0|
|    SneakerNews|              0|              0|(1024,[1,27,83,10...|       119|
|     titoloshop|              0|              0|(1024,[120,208,21...|         0|
|   Taniabanks24|              0|              1|(1024,[18,49,179,...|         2|
|         bodega|              3|              0|(1024,[140,171,22...|        11|
|   mitasneakers|              0|              0|(1024,[63,120,145...|         3|
|  CentervilleCF|              0|              0|(1024,[192,286,34...|         2|
| BBNSportsKSCDJ|              0|              0|(1024,[265,355,68...|         0|
|    SoleInsider

In [None]:
#**Just for testing purpose **

# Import LinearRegression
from pyspark.ml.regression import LinearRegression

# Split the data into training and testing sets
tf_train, tf_test = trainable_tf_id.randomSplit([0.999, 0.001], seed=13)

# Fit a Logistic Regression model to the training data
linear = LinearRegression(labelCol='engagement').fit(tf_test)

# Make predictions on the testing data
prediction = linear.transform(tf_test)
#y_pred = LinearRegression.predict(tf_test)
# Create a confusion matrix, comparing predictions to known labels
prediction.groupBy('username', 'engagement').count().show()

+---------------+----------+-----+
|       username|engagement|count|
+---------------+----------+-----+
|    ClosEMSmile|        22|    1|
|       BluThief|         0|    1|
|  ChrisShaw20KC|         0|    1|
| kutsushita___3|         9|    1|
|Hg1AWnAVVZcbSGq|         1|    1|
|      juliush23|         1|    1|
| lionslogistics|         2|    1|
|superyeezyboost|         1|    1|
| KaylieAbelaArt|         0|    1|
|Shop_FINAL_FOUR|         1|    1|
|     ayeeeeeric|         2|    1|
|       Griouard|         1|    1|
|  IsaiahDavid96|         0|    1|
|     snoozie223|         2|    1|
|      MrTamPham|         0|    1|
|DlameBuhlebethu|         2|    1|
|    Inkfacefahz|         4|    1|
|     marlatroll|         0|    1|
|      FSURising|         0|    1|
|  SoleTalkDaily|         0|    6|
+---------------+----------+-----+
only showing top 20 rows



Logistic algorithm


In [None]:
val seed = 5000
val Array(tf_train, tf_test) = trainable_tf_id.randomSplit(Array(0.6, 0.4), seed)

// train logistic regression model with training data set
val logisticRegression = new LogisticRegression()
  .setMaxIter(100)
  .setRegParam(0.02)
  .setElasticNetParam(0.8)
val logisticRegressionModel = logisticRegression.fit(tf_train)

// run model with test data set to get predictions
// this will add new columns rawPrediction, probability and prediction
val predictionDf = logisticRegressionModel.transform(tf_test)
predictionDf.show(10)

In [None]:
# evaluate model with area under ROC
val evaluator = new BinaryClassificationEvaluator()
  .setLabelCol("engagement")
  .setRawPredictionCol("username")
  .setMetricName("areaUnderROC")

# measure the accuracy
val accuracy = evaluator.evaluate(predictionDf)
println(accuracy)

