# W261 Final Project ETL for Development Sample

### Notebook Set-Up

In [20]:
# imports
import time
import numpy as np
import pandas as pd
from pyspark.sql import Row
from pyspark.ml.feature import CountVectorizer
#mllib.linalg library 
from pyspark.sql import DataFrame
from pyspark.sql.functions import isnan

In [2]:
%reload_ext autoreload
%autoreload 2

In [3]:
# store path to notebook
#PWD = !pwd
#PWD = PWD[0]

In [4]:
# start Spark Session
from pyspark.sql import SparkSession
app_name = "w261FinalProject"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

__`REMINDER:`__ If you are running this notebook on the course docker container, you can monitor the progress of your jobs using the Spark UI at: http://localhost:4040/jobs/

## Load Data

In [5]:
original_trainRDD = sc.textFile('data/train.txt')
original_testRDD = sc.textFile('data/test.txt')

## Transformation

### Sample

In [250]:
#change the seed for a different sample
sampleRDD1, sampleRDD2 = original_trainRDD.randomSplit([0.9999,0.0001], seed = 1)
sampleRDD2.cache()

PythonRDD[521] at RDD at PythonRDD.scala:49

In [251]:
nrow = sampleRDD2.count()
print("This sample contains", str(nrow), "rows.")

This sample contains 4478 rows.


In [343]:
type(sampleRDD2)

pyspark.rdd.PipelinedRDD

In [345]:
sampleRDD2.take(2)

['0\t\t4\t50\t18\t3339\t20\t26\t17\t133\t\t2\t\t18\t09ca0b81\t09e68b86\t86c4b829\te3d0459f\t25c83c98\t\t7227c706\t0b153874\ta73ee510\t305a0646\t9625b211\t997a695a\tdccbd94b\t07d13a8f\t36721ddc\tc0b906bb\te5ba7672\t5aed7436\t21ddcdc9\ta458ea53\t0cbbcc92\t\t32c7478e\t0174dd24\t3d2bedd7\td8ecbc17',
 '0\t\t12\t20\t18\t30445\t82\t0\t18\t53\t\t0\t\t18\tbe589b51\t8e465f4d\t35d889dd\t5e5e218f\t25c83c98\t6f6d9be8\t5732a3f8\t0b153874\ta73ee510\ta1680317\td70e2491\t575bb5c9\t2b9f0754\t07d13a8f\te815112f\t85a05c1a\td4bb7bd8\tf2becb37\t\t\tfe89e74a\t\t32c7478e\tbaf42944\t\t']

In [252]:
sampleRDD2.first()

'0\t\t4\t50\t18\t3339\t20\t26\t17\t133\t\t2\t\t18\t09ca0b81\t09e68b86\t86c4b829\te3d0459f\t25c83c98\t\t7227c706\t0b153874\ta73ee510\t305a0646\t9625b211\t997a695a\tdccbd94b\t07d13a8f\t36721ddc\tc0b906bb\te5ba7672\t5aed7436\t21ddcdc9\ta458ea53\t0cbbcc92\t\t32c7478e\t0174dd24\t3d2bedd7\td8ecbc17'

In [259]:
#  helper function to normalize the data 
def normalize(dataRDD):
    """
    Scale and center data round mean of each feature.
    Args:
        dataRDD - records are tuples of (features_array, y)
    Returns:
        normedRDD - records are tuples of (features_array, y)
    """
      
    numericalRDD = dataRDD.map(lambda x: list(x[:13])).cache()
    
    def remove_na(col):
        '''
        we needed a column wise calc of mean and std. To do this we had to remove NaN 
        while maintaining the position of the number in the RDD so its emited as an key
        '''
        #good_stuff, id = col[0], col[1]
        for i in range(0,len(col)):
            if (col[i] != '\t') and (col[i] != '-'):
                yield(i,int(col[i]))
        
    featureSums = numericalRDD.flatMap(lambda x: remove_na(x)) \
                               .reduceByKey(lambda a, b: (a + b)).collect()
    featureCount = numericalRDD.flatMap(lambda x: remove_na(x)).countByKey()
    
    #calc mean for each of the 13 columns 
    #featureSums is a list and featureCount is a dictionary, so some matching needed to be completed
    means_dict = dict()
    for j in range(0,len(featureCount)+1):
        for k in range(0,12):
            if featureSums[k][0] == j:
                cur_col_sum = featureSums[k][1]
        if j not in featureCount.keys():
            next
        else:
            cur_col_mean = cur_col_sum / featureCount[j]
            means_dict[j] = cur_col_mean
           # print(j, cur_col_mean)
        
    mean_dict1 = sc.broadcast(means_dict)
    #print(mean_dict1.value)
    
    def calc_std(line):
        key,value = line[0],line[1]
        #yield((key, mean_dict1.value[key]))
        individ_deviation = (value - mean_dict1.value[key])**2
        yield((key,individ_deviation))
    
    
    featureStd = numericalRDD.flatMap(lambda x: remove_na(x)) \
                            .flatMap(lambda x: calc_std(x)) \
                            .reduceByKey(lambda a, b: (a + b)).collect()
    #calc std for each of the 13 columns; test = total error across all observations by key
    std_dict = dict()
    for j in range(0,len(featureCount)+1):
        if j not in featureCount.keys():
            next
        else:
            for k in range(0,12):
                if featureStd[k][0] == j:
                    cur_col_sum = featureStd[k][1]
            cur_col_mean = np.sqrt(cur_col_sum / featureCount[j])
            std_dict[j] = cur_col_mean
    std_dict1 = sc.broadcast(std_dict)
    #print("std_dict", std_dict)
    
    def apply_transformation(col):
        new_row =[]
        for i in range(0,len(col)):
            if (col[i] == '\t') or (col[i] == '-'):
                new_row.append(col[i])
            else:
                key,value = i,int(col[i])
                feature_mean = mean_dict1.value[key]
                feature_std = std_dict1.value[key]
                new_value = round(((value-feature_mean)/feature_std),2)
                new_row.append(new_value)
        return(new_row)
    
    normedRDD = numericalRDD.map(lambda x: apply_transformation(x))
    
    #print(normedRDD.collect())
    
    return(normedRDD)

In [260]:
normedRDD = normalize(sampleRDD2)

In [316]:
normedRDD3 = pd.DataFrame(normedRDD.zipWithIndex().collect())
#normedRDD3.columns = ['a', 'b']
normedRDD3.head()

Unnamed: 0,0,1
0,"[-0.59, \t, \t, 0.62, \t, 0.62, -1.32, \t, -0....",0
1,"[-0.59, \t, \t, -0.61, -0.24, \t, -0.56, -1.32...",1
2,"[1.69, \t, \t, -0.61, \t, -0.86, \t, \t, 2.23,...",2
3,"[-0.59, \t, 3.04, \t, -0.63, 1.35, \t, \t, -0....",3
4,"[1.69, \t, 2.09, \t, -0.63, \t, 1.36, 1.03, \t...",4


In [None]:
sampleRDD3 = sampleRDD2.zipWithIndex().collect()

In [296]:
sampleRDD3 = pd.DataFrame(sampleRDD3)
sampleRDD3.head()

Unnamed: 0,0,1
0,0\t\t4\t50\t18\t3339\t20\t26\t17\t133\t\t2\t\t...,0
1,0\t\t12\t20\t18\t30445\t82\t0\t18\t53\t\t0\t\t...,1
2,1\t\t1\t1\t\t993\t\t0\t1\t2\t\t0\t\t\t5a9ed9b0...,2
3,0\t8\t17\t\t2\t622\t22\t79\t21\t557\t1\t9\t0\t...,3
4,1\t6\t1\t76\t5\t7\t0\t30\t4\t5\t1\t6\t\t0\t68f...,4


In [315]:
#sampleRDD4 = pd.DataFrame(sampleRDD3)
sampleRDD3.columns = ['a', 'b']
sampleRDD3.head()

Unnamed: 0,a,b
0,0\t\t4\t50\t18\t3339\t20\t26\t17\t133\t\t2\t\t...,0
1,0\t\t12\t20\t18\t30445\t82\t0\t18\t53\t\t0\t\t...,1
2,1\t\t1\t1\t\t993\t\t0\t1\t2\t\t0\t\t\t5a9ed9b0...,2
3,0\t8\t17\t\t2\t622\t22\t79\t21\t557\t1\t9\t0\t...,3
4,1\t6\t1\t76\t5\t7\t0\t30\t4\t5\t1\t6\t\t0\t68f...,4


In [317]:
merged = sampleRDD4.join(normedRDD3,on='b')
merged.head()

Unnamed: 0,a,b,0,1
0,0\t\t4\t50\t18\t3339\t20\t26\t17\t133\t\t2\t\t...,0,"[-0.59, \t, \t, 0.62, \t, 0.62, -1.32, \t, -0....",0
1,0\t\t12\t20\t18\t30445\t82\t0\t18\t53\t\t0\t\t...,1,"[-0.59, \t, \t, -0.61, -0.24, \t, -0.56, -1.32...",1
2,1\t\t1\t1\t\t993\t\t0\t1\t2\t\t0\t\t\t5a9ed9b0...,2,"[1.69, \t, \t, -0.61, \t, -0.86, \t, \t, 2.23,...",2
3,0\t8\t17\t\t2\t622\t22\t79\t21\t557\t1\t9\t0\t...,3,"[-0.59, \t, 3.04, \t, -0.63, 1.35, \t, \t, -0....",3
4,1\t6\t1\t76\t5\t7\t0\t30\t4\t5\t1\t6\t\t0\t68f...,4,"[1.69, \t, 2.09, \t, -0.63, \t, 1.36, 1.03, \t...",4


In [320]:
merged2 =  sc.parallelize(merged.values)

In [322]:
merged2.take(2)

[array(['0\t\t4\t50\t18\t3339\t20\t26\t17\t133\t\t2\t\t18\t09ca0b81\t09e68b86\t86c4b829\te3d0459f\t25c83c98\t\t7227c706\t0b153874\ta73ee510\t305a0646\t9625b211\t997a695a\tdccbd94b\t07d13a8f\t36721ddc\tc0b906bb\te5ba7672\t5aed7436\t21ddcdc9\ta458ea53\t0cbbcc92\t\t32c7478e\t0174dd24\t3d2bedd7\td8ecbc17',
        0,
        list([-0.59, '\t', '\t', 0.62, '\t', 0.62, -1.32, '\t', -0.93, 1.81, '\t', -0.3, -0.29]),
        0], dtype=object),
 array(['0\t\t12\t20\t18\t30445\t82\t0\t18\t53\t\t0\t\t18\tbe589b51\t8e465f4d\t35d889dd\t5e5e218f\t25c83c98\t6f6d9be8\t5732a3f8\t0b153874\ta73ee510\ta1680317\td70e2491\t575bb5c9\t2b9f0754\t07d13a8f\te815112f\t85a05c1a\td4bb7bd8\tf2becb37\t\t\tfe89e74a\t\t32c7478e\tbaf42944\t\t',
        1,
        list([-0.59, '\t', '\t', -0.61, -0.24, '\t', -0.56, -1.32, '\t', -0.89, 1.69, '\t', -0.29]),
        1], dtype=object)]

In [366]:
def make_final(line):
    'putting things back in the right shape for use in our next function'
    new_line =[]
    a,b,c,d = line[0],line[1],line[2],line[3]
    new_line.append(str(a[0]))
    for i in range(0,13):
        if c[i]=='\t':
            new_line.append("")
        else:
            new_line.append(str(c[i]))
    for j in range(14,40):
        a_new = a.split('\t')
        if a_new[i]=='\t':
            new_line.append("")
        else:    
            new_line.append(str(a_new[j]))
    final_line= "\t".join(new_line)
    return(final_line)
finalRDD = merged2.map(lambda x: make_final(x))

In [368]:
finalRDD.take(2)

['0\t-0.59\t\t\t0.62\t\t0.62\t-1.32\t\t-0.93\t1.81\t\t-0.3\t-0.29\t09ca0b81\t09e68b86\t86c4b829\te3d0459f\t25c83c98\t\t7227c706\t0b153874\ta73ee510\t305a0646\t9625b211\t997a695a\tdccbd94b\t07d13a8f\t36721ddc\tc0b906bb\te5ba7672\t5aed7436\t21ddcdc9\ta458ea53\t0cbbcc92\t\t32c7478e\t0174dd24\t3d2bedd7\td8ecbc17',
 '0\t-0.59\t\t\t-0.61\t-0.24\t\t-0.56\t-1.32\t\t-0.89\t1.69\t\t-0.29\tbe589b51\t8e465f4d\t35d889dd\t5e5e218f\t25c83c98\t6f6d9be8\t5732a3f8\t0b153874\ta73ee510\ta1680317\td70e2491\t575bb5c9\t2b9f0754\t07d13a8f\te815112f\t85a05c1a\td4bb7bd8\tf2becb37\t\t\tfe89e74a\t\t32c7478e\tbaf42944\t\t']

In [369]:
type(finalRDD)

pyspark.rdd.PipelinedRDD

# Put in wide, sparse feature format

In [371]:
def parseCV(line):
    """
    Map record_csv_string --> (features, label)
    """

    # start of categorical features
    col_start = 14
    
    raw_values = line.split('\t')
    label = int(raw_values[0])  ## y variable 
    
    # ignore numerics to start
    #numerical_values = list(pd.Series(raw_values[1:14]).apply(pd.to_numeric))
    numericals = []
    for idx, value in enumerate(raw_values[1:col_start]):
        if value != '':
            numericals.append('n' + str(idx) + '_' + str(value))
            
    
    categories = []
    for idx, value in enumerate(raw_values[col_start:]):
        if value != '':
            categories.append('c'+ str(idx) + '_' + str(value))

    return Row(label=label, raw=numericals + categories)


def vectorizeCV(DF):
    
    vectorizer = CountVectorizer()
    cv = CountVectorizer(inputCol="raw", outputCol="features")
    
    model = cv.fit(DF)
    result = model.transform(DF)
    
    return result
parsedDF = sampleRDD2.map(parseCV).toDF().cache()
vectorizedDF = vectorizeCV(parsedDF)

In [373]:
#not sure why its 30,946 in the first column...comes out of countvectorizor on spark
#https://spark.apache.org/docs/latest/ml-features.html#countvectorizer
vectorizedDF = vectorizeCV(parsedDF)
vectorizedDF.show()

+-----+--------------------+--------------------+
|label|                 raw|            features|
+-----+--------------------+--------------------+
|    0|[n1_4, n2_50, n3_...|(30946,[0,1,2,4,5...|
|    0|[n1_12, n2_20, n3...|(30946,[0,1,2,5,1...|
|    1|[n1_1, n2_1, n4_9...|(30946,[0,1,6,7,1...|
|    0|[n0_8, n1_17, n3_...|(30946,[0,1,4,12,...|
|    1|[n0_6, n1_1, n2_7...|(30946,[0,1,2,4,1...|
|    1|[n1_99, n2_1, n3_...|(30946,[1,2,4,10,...|
|    0|[n0_3, n1_21, n2_...|(30946,[0,1,4,8,1...|
|    0|[n1_2, n2_20, n3_...|(30946,[0,1,3,5,8...|
|    0|[n0_0, n1_144, n4...|(30946,[0,2,3,4,5...|
|    0|[n1_0, n2_5, n4_3...|(30946,[0,2,3,6,1...|
|    0|[n0_0, n1_1, n2_4...|(30946,[0,1,2,3,5...|
|    0|[n0_9, n1_5, n2_1...|(30946,[0,2,3,6,9...|
|    0|[n1_323, n2_2, n3...|(30946,[1,2,14,16...|
|    0|[n0_0, n1_424, n3...|(30946,[0,1,2,4,6...|
|    0|[n0_0, n1_13, n2_...|(30946,[0,1,2,5,6...|
|    0|[n1_180, n2_6, n3...|(30946,[1,2,8,14,...|
|    0|[n1_126, n2_2, n3...|(30946,[0,2,4,6,8...|


In [376]:
parsedDF = finalRDD.map(parseCV).toDF().cache()
vectorizedTest = vectorizeCV(parsedDF)
#vectorizedTest = vectorizeCV(parsedDF)
vectorizedTest.show()

+-----+--------------------+--------------------+
|label|                 raw|            features|
+-----+--------------------+--------------------+
|    0|[n0_-0.59, n3_0.6...|(25779,[0,1,2,3,5...|
|    0|[n0_-0.59, n3_-0....|(25779,[0,1,2,3,6...|
|    1|[n0_1.69, n3_-0.6...|(25779,[0,2,7,8,1...|
|    0|[n0_-0.59, n2_3.0...|(25779,[0,1,2,5,1...|
|    1|[n0_1.69, n2_2.09...|(25779,[0,2,3,5,1...|
|    1|[n0_1.69, n3_2.66...|(25779,[2,3,5,10,...|
|    0|[n0_-0.59, n2_0.6...|(25779,[0,1,2,5,9...|
|    0|[n0_-0.59, n3_-0....|(25779,[0,1,2,4,6...|
|    0|[n0_-0.59, n2_-0....|(25779,[0,1,3,4,5...|
|    0|[n0_-0.59, n3_-1....|(25779,[0,1,3,4,7...|
|    0|[n0_-0.59, n2_-0....|(25779,[0,1,2,3,4...|
|    0|[n0_-0.59, n2_3.5...|(25779,[0,1,3,4,7...|
|    0|[n0_-0.59, n3_0.2...|(25779,[1,2,3,13,...|
|    0|[n0_-0.59, n2_-0....|(25779,[0,1,2,3,5...|
|    0|[n0_-0.59, n2_-0....|(25779,[0,1,2,3,6...|
|    0|[n0_-0.59, n3_-0....|(25779,[1,2,3,9,1...|
|    0|[n0_-0.59, n3_-0....|(25779,[0,1,3,5,7...|


In [377]:
sc.addPyFile("fm_parallel_sgd.py")
import fm_parallel_sgd as fm

In [378]:
print (vectorizedDF.rdd.count())
print (vectorizedTest.rdd.count())
print (vectorizedDF.rdd.first())
print (vectorizedTest.rdd.first())

4478
4478
Row(label=0, raw=['n1_4', 'n2_50', 'n3_18', 'n4_3339', 'n5_20', 'n6_26', 'n7_17', 'n8_133', 'n10_2', 'n12_18', 'c0_09ca0b81', 'c1_09e68b86', 'c2_86c4b829', 'c3_e3d0459f', 'c4_25c83c98', 'c6_7227c706', 'c7_0b153874', 'c8_a73ee510', 'c9_305a0646', 'c10_9625b211', 'c11_997a695a', 'c12_dccbd94b', 'c13_07d13a8f', 'c14_36721ddc', 'c15_c0b906bb', 'c16_e5ba7672', 'c17_5aed7436', 'c18_21ddcdc9', 'c19_a458ea53', 'c20_0cbbcc92', 'c22_32c7478e', 'c23_0174dd24', 'c24_3d2bedd7', 'c25_d8ecbc17'], features=SparseVector(30946, {0: 1.0, 1: 1.0, 2: 1.0, 4: 1.0, 5: 1.0, 7: 1.0, 10: 1.0, 20: 1.0, 32: 1.0, 122: 1.0, 155: 1.0, 173: 1.0, 214: 1.0, 365: 1.0, 369: 1.0, 495: 1.0, 504: 1.0, 632: 1.0, 635: 1.0, 834: 1.0, 1894: 1.0, 2122: 1.0, 2264: 1.0, 2392: 1.0, 2780: 1.0, 6206: 1.0, 11184: 1.0, 12084: 1.0, 13281: 1.0, 18958: 1.0, 20027: 1.0, 24131: 1.0, 25509: 1.0, 25574: 1.0}))
Row(label=0, raw=['n0_-0.59', 'n3_0.62', 'n5_0.62', 'n6_-1.32', 'n8_-0.93', 'n9_1.81', 'n11_-0.3', 'n12_-0.29', 'c0_09ca0b81

In [381]:
temp = time.time()
model = fm.trainFM_parallel_sgd (sc, vectorizedTest.rdd, iterations=10, iter_sgd= 10, alpha=0.01, regParam=0.01, factorLength=2,\
                      verbose=True, savingFilename = None, evalTraining=None)
print ('time :', time.time()-temp)

iter 	time 	train_logl 	val_logl
0 	0 	0.696670 	0.696431
1 	1 	2.835949 	2.765170
2 	3 	3.017305 	2.939728
3 	4 	2.968775 	2.891082
4 	6 	2.943212 	2.865419
5 	7 	2.934917 	2.856944
6 	9 	2.935350 	2.857216
7 	10 	2.941669 	2.863376
8 	12 	2.949203 	2.870824
9 	14 	2.959387 	2.880917
10 	15 	2.969375 	2.890878
Train set: 
rtv_pr_auc, rtv_auc, logl, mse, accuracy
(0.9601969430901022, 0.9838280326055683, 2.9693748527221677, 0.7087952691773113, 0.2556390977443609)
Validation set:
(0.9266569543807905, 0.9558422354230937, 2.890877994349888, 0.6919755965791868, 0.2750845546786922)
time : 18.536879062652588


In [None]:
print (evaluate(vectorizedTest, model))