In [1]:
from pyspark.sql import *
from pyspark import SparkConf

from pyspark.sql import DataFrame
from pyspark.sql.functions import rand
from pyspark.sql.types import IntegerType, DoubleType

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.classification import LogisticRegression, LinearSVC, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from submission import base_features_gen_pipeline, gen_meta_features, test_prediction

import random
rseed = 1024
random.seed(rseed)


def gen_binary_labels(df):
    df = df.withColumn('label_0', (df['label'] == 0).cast(DoubleType()))
    df = df.withColumn('label_1', (df['label'] == 1).cast(DoubleType()))
    df = df.withColumn('label_2', (df['label'] == 2).cast(DoubleType()))
    return df

# Create a Spark Session
conf = SparkConf().setMaster("local[*]").setAppName("lab3")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Load data
train_data = spark.read.load("proj2train.csv", format="csv", sep="\t", inferSchema="true", header="true")
test_data = spark.read.load("proj2test.csv", format="csv", sep="\t", inferSchema="true", header="true")

In [7]:
train_data.groupBy('category').count().show()

+--------+-----+
|category|count|
+--------+-----+
|    FOOD|  901|
|     PAS|  542|
|    MISC|  798|
+--------+-----+



In [8]:
test_data.groupBy('category').count().show()

+--------+-----+
|category|count|
+--------+-----+
|    FOOD|  418|
|     PAS|  167|
|    MISC|  215|
+--------+-----+



In [2]:
# build the pipeline from task 1.1
base_features_pipeline = base_features_gen_pipeline()
# Fit the pipeline using train_data
base_features_pipeline_model = base_features_pipeline.fit(train_data)
# Transform the train_data using fitted pipeline
training_set = base_features_pipeline_model.transform(train_data)

In [3]:
# assign random groups and binarize the labels
training_set = training_set.withColumn('group', (rand(rseed)*5).cast(IntegerType()))
training_set = gen_binary_labels(training_set)

# define base models
nb_0 = NaiveBayes(featuresCol='features', labelCol='label_0', predictionCol='nb_pred_0', probabilityCol='nb_prob_0', rawPredictionCol='nb_raw_0')
nb_1 = NaiveBayes(featuresCol='features', labelCol='label_1', predictionCol='nb_pred_1', probabilityCol='nb_prob_1', rawPredictionCol='nb_raw_1')
nb_2 = NaiveBayes(featuresCol='features', labelCol='label_2', predictionCol='nb_pred_2', probabilityCol='nb_prob_2', rawPredictionCol='nb_raw_2')
svm_0 = LinearSVC(featuresCol='features', labelCol='label_0', predictionCol='svm_pred_0', rawPredictionCol='svm_raw_0')
svm_1 = LinearSVC(featuresCol='features', labelCol='label_1', predictionCol='svm_pred_1', rawPredictionCol='svm_raw_1')
svm_2 = LinearSVC(featuresCol='features', labelCol='label_2', predictionCol='svm_pred_2', rawPredictionCol='svm_raw_2')

# build pipeline to generate predictions from base classifiers, will be used in task 1.3
gen_base_pred_pipeline = Pipeline(stages=[nb_0, nb_1, nb_2, svm_0, svm_1, svm_2])
gen_base_pred_pipeline_model = gen_base_pred_pipeline.fit(training_set)

# task 1.2
meta_features = gen_meta_features(training_set, nb_0, nb_1, nb_2, svm_0, svm_1, svm_2)

# build onehotencoder and vectorassembler pipeline 
onehot_encoder = OneHotEncoderEstimator(inputCols=['nb_pred_0', 'nb_pred_1', 'nb_pred_2', 'svm_pred_0', 'svm_pred_1', 'svm_pred_2', 'joint_pred_0', 'joint_pred_1', 'joint_pred_2'], outputCols=['vec{}'.format(i) for i in range(9)])
vector_assembler = VectorAssembler(inputCols=['vec{}'.format(i) for i in range(9)], outputCol='meta_features')
gen_meta_feature_pipeline = Pipeline(stages=[onehot_encoder, vector_assembler])
gen_meta_feature_pipeline_model = gen_meta_feature_pipeline.fit(meta_features)
meta_features = gen_meta_feature_pipeline_model.transform(meta_features)

# train the meta clasifier
lr_model = LogisticRegression(featuresCol='meta_features', labelCol='label', predictionCol='final_prediction', maxIter=20, regParam=1., elasticNetParam=0)
meta_classifier = lr_model.fit(meta_features)

In [4]:
# task 1.3
pred_test = test_prediction(test_data, base_features_pipeline_model, gen_base_pred_pipeline_model, gen_meta_feature_pipeline_model, meta_classifier)


test_df_sechema:
root
 |-- id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- descript: string (nullable = true)

trans_test_df_SCHEMA:
root
 |-- id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- descript: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)

new_predict_SCHEMA:
root
 |-- id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- descript: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- nb_raw_0: vector (nullable = true)
 |-- nb_prob_0: vector (nullable = true)
 |-- nb_pred_0: double (nullable = false)
 |-- nb_raw_1: vector (nullable = true)
 |-- nb_prob_1: vector (nullable = true)
 |-- nb_pred_1: double (nullable = false)
 |-- nb_r

In [7]:
test_data.count()

800

In [6]:
meta_features.select('id','nb_pred_0','svm_pred_0','joint_pred_0').show(5)

+---+---------+----------+------------+
| id|nb_pred_0|svm_pred_0|joint_pred_0|
+---+---------+----------+------------+
|  3|      0.0|       0.0|         0.0|
|  5|      1.0|       1.0|         3.0|
| 14|      0.0|       0.0|         0.0|
| 21|      1.0|       0.0|         2.0|
| 22|      0.0|       0.0|         0.0|
+---+---------+----------+------------+
only showing top 5 rows



In [7]:
meta_features.select('id','nb_pred_1','svm_pred_1','joint_pred_1').show(5)

+---+---------+----------+------------+
| id|nb_pred_1|svm_pred_1|joint_pred_1|
+---+---------+----------+------------+
|  3|      1.0|       0.0|         2.0|
|  5|      0.0|       0.0|         0.0|
| 14|      1.0|       1.0|         3.0|
| 21|      0.0|       0.0|         0.0|
| 22|      1.0|       1.0|         3.0|
+---+---------+----------+------------+
only showing top 5 rows



In [8]:
meta_features.select('id','nb_pred_2','svm_pred_2','joint_pred_2').show(5)

+---+---------+----------+------------+
| id|nb_pred_2|svm_pred_2|joint_pred_2|
+---+---------+----------+------------+
|  3|      0.0|       0.0|         0.0|
|  5|      0.0|       0.0|         0.0|
| 14|      0.0|       0.0|         0.0|
| 21|      0.0|       1.0|         1.0|
| 22|      0.0|       0.0|         0.0|
+---+---------+----------+------------+
only showing top 5 rows



In [9]:
meta_features.count()

2241

In [10]:
training_set.count()

2241

In [31]:
from pyspark.sql.functions import lit

In [32]:
ss=training_set.filter(training_set['group']==1).withColumn('extra_1',lit(0))

In [34]:
ss['features','extra_1'].show()

+--------------------+-------+
|            features|extra_1|
+--------------------+-------+
|(5421,[0,7,47,49,...|      0|
|(5421,[0,3,4,14,7...|      0|
|(5421,[0,1,4,8,13...|      0|
|(5421,[2,10,171,5...|      0|
|(5421,[0,3,11,12,...|      0|
|(5421,[3,6,10,19,...|      0|
|(5421,[0,21,25,43...|      0|
|(5421,[7,14,28,72...|      0|
|(5421,[1,2,3,18,2...|      0|
|(5421,[0,1,2,4,7,...|      0|
|(5421,[42,66,1177...|      0|
|(5421,[0,2,3,4,10...|      0|
|(5421,[0,1,6,10,1...|      0|
|(5421,[1,2,5,16,2...|      0|
|(5421,[0,1,12,16,...|      0|
|(5421,[6,17,22,25...|      0|
|(5421,[2,8,13,14,...|      0|
|(5421,[0,1,2,4,8,...|      0|
|(5421,[5,94,163,2...|      0|
|(5421,[0,1,2,3,5,...|      0|
+--------------------+-------+
only showing top 20 rows



In [45]:
tt=[]
for i in range(439):
    tt.append(i)

pyspark.sql.dataframe.DataFrame

In [49]:
#ss2=training_set.filter(training_set['group']==1).withColumn('extra_1',lit(tt))

In [7]:
spark.stop()

In [1]:
from pyspark.sql import SparkSession

from pyspark.sql import functions as F

spark = SparkSession.builder.appName('joins_example').getOrCreate()

sc = spark.sparkContext

dataset1 = [

  {

  'key' : 'abc',

  'val11' : 1.1,

  'val12' : 1.2

  },

  {

  'key' : 'def',

  'val11' : 3.0,

  'val12' : 3.4

  }

]

dataset2 = [

  {

  'key' : 'abc',

  'val21' : 2.1,

  'val22' : 2.2

  },

  {

  'key' : 'xyz',

  'val21' : 3.1,

  'val22' : 3.2

  }

]

rdd1 = sc.parallelize(dataset1)

df1 = spark.createDataFrame(rdd1)

print('df1')

df1.show()

rdd2 = sc.parallelize(dataset2)

df2 = spark.createDataFrame(rdd2)

print('df2')

df2.show()



df1
+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|abc|  1.1|  1.2|
|def|  3.0|  3.4|
+---+-----+-----+

df2
+---+-----+-----+
|key|val21|val22|
+---+-----+-----+
|abc|  2.1|  2.2|
|xyz|  3.1|  3.2|
+---+-----+-----+



In [2]:
df=df1.union(df2)
df.show()

+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|abc|  1.1|  1.2|
|def|  3.0|  3.4|
|abc|  2.1|  2.2|
|xyz|  3.1|  3.2|
+---+-----+-----+



In [3]:
df_u=df.union(df2)
df_u.show()

+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|abc|  1.1|  1.2|
|def|  3.0|  3.4|
|abc|  2.1|  2.2|
|xyz|  3.1|  3.2|
|abc|  2.1|  2.2|
|xyz|  3.1|  3.2|
+---+-----+-----+



In [4]:
df = df1.join(df2, df1.key ==df2.key, how='inner')

df.show()

+---+-----+-----+---+-----+-----+
|key|val11|val12|key|val21|val22|
+---+-----+-----+---+-----+-----+
|abc|  1.1|  1.2|abc|  2.1|  2.2|
+---+-----+-----+---+-----+-----+



In [5]:
df_1 = df1.join(df2, on=['key'], how='inner')

df_1.show()

+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|abc|  1.1|  1.2|  2.1|  2.2|
+---+-----+-----+-----+-----+



In [6]:
df_2= df1.join(df2, on=['key'], how='left')
df_2.show()

+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|abc|  1.1|  1.2|  2.1|  2.2|
|def|  3.0|  3.4| null| null|
+---+-----+-----+-----+-----+



In [20]:
dataset3 = [

  {

  'key' : 'def',

  'val11' : 3.0,

  'val12' : 3.4, 'val21':5, 'val22':6

  }

]

In [21]:
rdd3 = sc.parallelize(dataset3)

df3 = spark.createDataFrame(rdd3)
df3.show()

+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|def|  3.0|  3.4|    5|    6|
+---+-----+-----+-----+-----+



In [31]:
df_4 = df_2.join(df3, [df_2.key==df3.key, df_2.val11==df3.val11, df_2.val12==df_2.val12],how='inner')
df_4.show()

+---+-----+-----+-----+-----+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+---+-----+-----+-----+-----+
|def|  3.0|  3.4| null| null|def|  3.0|  3.4|    5|    6|
+---+-----+-----+-----+-----+---+-----+-----+-----+-----+



In [None]:
df_4 = df_2.join(df3, df_2.key==df3.key and df_2.val11=df3.val11 and df_2.val12==df_2.val12,how='inner')
df_4.show()

In [12]:
df2.show()

+---+-----+-----+
|key|val21|val22|
+---+-----+-----+
|abc|  2.1|  2.2|
|xyz|  3.1|  3.2|
+---+-----+-----+



In [19]:
df_2.show()

+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|abc|  1.1|  1.2|  2.1|  2.2|
|def|  3.0|  3.4| null| null|
+---+-----+-----+-----+-----+



In [8]:
g=6

In [12]:
w=double(g)

NameError: name 'double' is not defined

In [11]:
type(w)

float

In [15]:
bool(2==2.0)

True

In [16]:
float(0)

0.0

In [12]:
type(base_features_pipeline_model)

pyspark.ml.pipeline.PipelineModel

In [13]:
base_features_pipeline_model.transform(train_data).show(3)

+---+--------+--------------------+--------------------+--------------------+-----+
| id|category|            descript|               words|            features|label|
+---+--------+--------------------+--------------------+--------------------+-----+
|  0|    MISC|I've been there t...|[i've, been, ther...|(5421,[1,18,31,39...|  1.0|
|  1|    FOOD|Stay away from th...|[stay, away, from...|(5421,[0,1,15,20,...|  0.0|
|  2|    FOOD|Wow over 100 beer...|[wow, over, 100, ...|(5421,[3,109,556,...|  0.0|
+---+--------+--------------------+--------------------+--------------------+-----+
only showing top 3 rows

