# Processing Venmo Transactions with Spark:

### Goal: 
Segment Users into groups based on the way they use Venmo most regularly for lead generation (think Cash App, Zelle, Wells-Fargo, and other money-exchange applications) and advertising targeting based solely on their transaction notes.

## Results

## Methodology

### Creating Spark Session

#### Installing Necessary Tools for Spark

Running Spark through a jupyter notebook requires some tooling. I've installed spark, scala, and hadoop on my local machine based on the following instructions:
https://medium.com/big-data-engineering/how-to-install-apache-spark-2-x-in-your-pc-e2047246ffc3


#### Testing Spark Functionality

In [2]:
import findspark
findspark.init()

import pyspark
import random

sc = pyspark.SparkContext(appName="Pi")
num_samples = 1000000


def inside(p):
    x, y = random.random(), random.random()
    return x * x + y * y < 1


count = sc.parallelize(range(0, num_samples)).filter(inside).count()

pi = 4 * count / num_samples
print(pi)

sc.stop()

3.140872


#### Initializing spark session

In [76]:
from pyspark.sql.session import SparkSession
sc = pyspark.SparkContext(appName="venmo_transactions").getOrCreate()
spark = SparkSession(sc)
spark.sparkContext.getConf().getAll()

[('spark.app.id', 'local-1590950446519'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.host', 'vm-cbi-12oe3f'),
 ('spark.master', 'local[*]'),
 ('spark.driver.port', '63769'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'venmo_transactions')]

### Importing Data

In [77]:
csv = 'C:\\Users\\Stuart\\Documents\\GitHub\\venmo\\data\\output\\transactions_full.csv'
transactions_df = spark.read.csv(csv, header=True, inferSchema=True)

#### Verifying Data Integrity

It is smart to check the integrity of the data when working with any dataset. Here, I check the first row, total length, datatypes, and the number of null values per field in the dataframe.

In [93]:
print('First Row : \n\n', transactions_df.first())

print('\n\nLength of dataframe: \n\n', transactions_df.count(), 'records')

First Row : 

 Row(transaction_id=2540405007077868184, actor_user_id=2482900494712832556, target_user_id=None, target_type='user', overall_type='payment', transaction_note="b'fuk ya'", date_created='2018-08-07 02:11:16')


Length of dataframe: 

 7076584 records


In [79]:
transactions_df.printSchema()

root
 |-- transaction_id: long (nullable = true)
 |-- actor_user_id: long (nullable = true)
 |-- target_user_id: string (nullable = true)
 |-- target_type: string (nullable = true)
 |-- overall_type: string (nullable = true)
 |-- transaction_note: string (nullable = true)
 |-- date_created: string (nullable = true)



In [91]:
#check for null values
from pyspark.sql.functions import col,sum

#show sum of null values after converting isnull T/F results to ints
transactions_df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in transactions_df.columns)).show()

+--------------+-------------+--------------+-----------+------------+----------------+------------+
|transaction_id|actor_user_id|target_user_id|target_type|overall_type|transaction_note|date_created|
+--------------+-------------+--------------+-----------+------------+----------------+------------+
|             0|            0|       7076584|          0|           0|               0|          43|
+--------------+-------------+--------------+-----------+------------+----------------+------------+



#### Updating Data types, Filtering NAs

In [97]:
#convert datetime from string to timestamp type
from pyspark.sql.types import TimestampType

transactions_df = transactions_df.withColumn("datetime_created",transactions_df['date_created'].cast(TimestampType()))

#verify conversion
transactions_df.printSchema()

In [100]:
#filter out null rows from date_created field
transactions_df = transactions_df.filter(transactions_df.date_created.isNotNull())

#check length
transactions_df.count()

7076541

### Transforming the Data

In order to model topics, the transaction notes need to be processed. Similar to many NLP approaches, I tokenize the documents, remove stopwords, and stem them.

In [155]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer
from pyspark.sql.functions import udf, regexp_replace, trim
from pyspark.sql.types import ArrayType, StringType


In [161]:
#replace emoji colons
emoji_spaced_df = transactions_df.withColumn(
    'spaced_note', regexp_replace('transaction_note', ':', ' '))

#replace "b'" from note strings, an artifact of exporting at prior stage
dropped_b_df = emoji_spaced_df.withColumn(
    'note_no_b', regexp_replace('spaced_note', "b'", ''))

#drop double spaces
single_sp_df = dropped_b_df.withColumn('note_ss',
                                       regexp_replace('note_no_b', '  ', ' '))

#drop single quotes
single_q_df = single_sp_df.withColumn('note_sq',
                                      regexp_replace('note_ss', "'", ''))

#trim whitespace
trimmed_ws_df = single_q_df.withColumn('trimmed_note', trim(col('note_sq')))

#tokenize the dataframe
tokenizer = Tokenizer(inputCol='trimmed_note', outputCol='note_tokens')
tokenized_df = tokenizer.transform(trimmed_ws_df).select(
    'transaction_id', 'spaced_note', 'note_no_b', 'trimmed_note',
    'note_tokens', 'date_created')

# remove stop words from tokens
remover = StopWordsRemover(inputCol='note_tokens',
                           outputCol='note_sans_stopwords')
df_sans_stopwords = remover.transform(tokenized_df).select(
    'transaction_id', 'spaced_note', 'note_no_b', 'trimmed_note',
    'note_tokens', 'note_sans_stopwords', 'date_created')

# Stem text
stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens],
                  ArrayType(StringType()))

df_stemmed = df_sans_stopwords.withColumn(
    "note_stemmed", stemmer_udf("note_sans_stopwords")).select(
        'transaction_id', 'spaced_note', 'note_no_b', 'trimmed_note',
        'note_tokens', 'note_sans_stopwords', 'note_stemmed', 'date_created')

df_stemmed.show()

+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|     transaction_id|         spaced_note|           note_no_b|        trimmed_note|         note_tokens| note_sans_stopwords|        note_stemmed|       date_created|
+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|2540405007077868184|           b'fuk ya'|             fuk ya'|              fuk ya|           [fuk, ya]|           [fuk, ya]|           [fuk, ya]|2018-08-07 02:11:16|
|2540405006884930468|     b' automobile '|        automobile '|          automobile|        [automobile]|        [automobile]|         [automobil]|2018-08-07 02:11:16|
|2540405007379857710|   b' venmo_dollar '|      venmo_dollar '|        venmo_dollar|      [venmo_dollar]|      [venmo_dollar]|      [venmo_dollar]|2018-08-07 02

### Visualizing User Trends: Transaction Frequency

Most Frequent Users

In [123]:
agg = transactions_df.groupby('actor_user_id').count()

agg.orderBy(['count'], ascending=0).show()

+-------------------+-----+
|      actor_user_id|count|
+-------------------+-----+
|2135126842540032910|  359|
|1997633002405888173|  297|
|2021366932963328676|  235|
|2330070962143232368|  215|
|2048934239272960558|  187|
|2421457825038336069|  130|
|2181423544926208031|   91|
|1564104179318784848|   86|
|2411768840192000987|   63|
|1271828970471424398|   60|
|1874509636304896473|   58|
|1856331942199296981|   55|
|2004155983986688049|   53|
|2236412472590336483|   53|
|2337822597971968392|   50|
|2407072335396864571|   45|
|1548828859695104235|   45|
|1623586188034048269|   44|
|2522200947032064794|   43|
|2195803229650944396|   43|
+-------------------+-----+
only showing top 20 rows



In [122]:
help(agg.orderBy)

Help on method sort in module pyspark.sql.dataframe:

sort(*cols, **kwargs) method of pyspark.sql.dataframe.DataFrame instance
    Returns a new :class:`DataFrame` sorted by the specified column(s).
    
    :param cols: list of :class:`Column` or column names to sort by.
    :param ascending: boolean or list of boolean (default ``True``).
        Sort ascending vs. descending. Specify list for multiple sort orders.
        If a list is specified, length of the list must equal length of the `cols`.
    
    >>> df.sort(df.age.desc()).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> df.sort("age", ascending=False).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> df.orderBy(df.age.desc()).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> from pyspark.sql.functions import *
    >>> df.sort(asc("age")).collect()
    [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
    >>> df.orderBy(desc("age"), "name").collect()
    

### Visualizing User Trends: Word Frequencies

### Developing Topic Models

### Segmenting Users Based on Models

### Describing Each Segment

### Requirements

In [75]:
spark.stop()
sc.stop()

## Reflections