# 3.1 Check User ID's

##### Description

The basic relational structure is controlled by the column user_id, which is present in all tables. The data must be checked to ensure all user_ids in transactions, logs, and train have corresponding entries in the members table.

##### Notebook Steps

1. Connect Spark
1. Input all data sources
1. Check for parity
1. Remove records with no corresponding member record
1. Output Data

## 1. Connect Spark

In [1]:
import pyspark
sc = pyspark.SparkContext(appName="mems-clean")
sc.setLogLevel("INFO")

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

## 2. Input Data

In [2]:
members = sqlContext.read.format('com.databricks.spark.csv').options(inferschema='true', header='true').load('../../data/2-data_cleaning/2-members.output.csv')

members.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- city: integer (nullable = true)
 |-- registered_via: integer (nullable = true)
 |-- registration_date: string (nullable = true)



In [3]:
transactions = sqlContext.read.format('com.databricks.spark.csv').options(inferschema='true', header='true').load('../../data/2-data_cleaning/2-transactions.output.csv')

transactions.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- payment_method_id: integer (nullable = true)
 |-- payment_plan_days: integer (nullable = true)
 |-- plan_list_price: integer (nullable = true)
 |-- actual_amount_paid: integer (nullable = true)
 |-- is_auto_renew: boolean (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- membership_expire_date: string (nullable = true)
 |-- is_cancel: boolean (nullable = true)



In [4]:
logs = sqlContext.read.format('com.databricks.spark.csv').options(inferschema='true', header='true').load('../../data/2-data_cleaning/2-logs.output.csv')

logs.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_25: integer (nullable = true)
 |-- num_50: integer (nullable = true)
 |-- num_75: integer (nullable = true)
 |-- num_985: integer (nullable = true)
 |-- num_100: integer (nullable = true)
 |-- num_unq: integer (nullable = true)
 |-- total_secs: integer (nullable = true)



In [5]:
train = sqlContext.read.format('com.databricks.spark.csv').options(inferschema='true', header='true').load('../../data/2-data_cleaning/2-train.output.csv')

train.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- is_churn: boolean (nullable = true)



## 3. Check for Parity

In [10]:
from pyspark.sql.types import *      
from pyspark.sql.functions import *  
import numpy as np

This command performs an anti join to select user_ids in members but not in transactions. This is completed for each table in reference to the members table.

In [35]:
no_id_transactions = transactions.join(members, ['user_id'], 'leftanti').select('user_id')
no_id_logs = logs.join(members, ['user_id'], 'leftanti').select('user_id')
no_id_train = train.join(members, ['user_id'], 'leftanti').select('user_id')

We then outer join the three data frames to get all user ids not in members.

In [39]:
no_id = no_id_transactions.join(no_id_logs, on='user_id', how='outer')
no_id = no_id.join(no_id_train, on='user_id', how='outer')
no_id = no_id.select("user_id").distinct()

##### Number of ID's not in members

In [43]:
no_id.count()

121409

## 4. Remove Records

In [46]:
transactions.count()

1431009

In [47]:
transactions = transactions.join(no_id, ['user_id'], 'leftanti')
transactions.count()

1303156

In [48]:
logs.count()

18396362

In [49]:
logs = logs.join(no_id, ['user_id'], 'leftanti')
logs.count()

18395950

In [50]:
train.count()

970960

In [51]:
train = train.join(no_id, ['user_id'], 'leftanti')
train.count()

860967

## 5. Data Output

In [53]:
filepath = '../../data/3-eda/3.1-members.csv'

members.write.format('com.databricks.spark.csv').options(header='true').save(filepath)

In [54]:
filepath = '../../data/3-eda/3.1-transactions.csv'

transactions.write.format('com.databricks.spark.csv').options(header='true').save(filepath)

In [55]:
filepath = '../../data/3-eda/3.1-logs.csv'

logs.write.format('com.databricks.spark.csv').options(header='true').save(filepath)

In [56]:
filepath = '../../data/3-eda/3.1-train.csv'

train.write.format('com.databricks.spark.csv').options(header='true').save(filepath)