# rdd.DataFrame Vs pd.DataFrame

In [55]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import numpy as np

In [2]:
spark = SparkSession.builder.appName("Python Example")\
        .config("spark.some.config.option", "some-value")\
        .getOrCreate()

In [3]:
# From List
my_list = [['a', 1, 2], ['b', 2, 3],['c', 3, 4]]
col_name = ['A', 'B', 'C']

In [4]:
# Pandas
df = pd.DataFrame(my_list, col_name)

In [5]:
ds = spark.createDataFrame(my_list,col_name)

In [6]:
df

Unnamed: 0,0,1,2
A,a,1,2
B,b,2,3
C,c,3,4


In [7]:
ds.show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  1|  2|
|  b|  2|  3|
|  c|  3|  4|
+---+---+---+



# Read Data set from csv File in Spark

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Python Spark csv")\
        .config("spark.some.config.option", "some-value")\
        .getOrCreate()

In [9]:
ds = spark.read.csv("datas/Advertising.csv",sep = ",", encoding= "UTF-8",
                   comment = None, header = True, inferSchema= True)

In [10]:
ds.show(5)

+---+-----+-----+---------+-----+
|_c0|   TV|radio|newspaper|sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows



In [11]:
# In Pandas 
df = pd.read_csv("./datas/Advertising.csv")
df.head()

Unnamed: 0.1,Unnamed: 0,TV,radio,newspaper,sales
0,1,230.1,37.8,69.2,22.1
1,2,44.5,39.3,45.1,10.4
2,3,17.2,45.9,69.3,9.3
3,4,151.5,41.3,58.5,18.5
4,5,180.8,10.8,58.4,12.9


## Column Name

In [12]:
df.columns

Index(['Unnamed: 0', 'TV', 'radio', 'newspaper', 'sales'], dtype='object')

In [13]:
ds.columns

['_c0', 'TV', 'radio', 'newspaper', 'sales']

# Data Types

In [14]:
df.dtypes

Unnamed: 0      int64
TV            float64
radio         float64
newspaper     float64
sales         float64
dtype: object

In [15]:
ds.dtypes

[('_c0', 'int'),
 ('TV', 'double'),
 ('radio', 'double'),
 ('newspaper', 'double'),
 ('sales', 'double')]

# Fill Null

In [16]:
my_list = [['male', 1, None], ['female', 2, 3],['male', 3, 4]]
dp = pd.DataFrame(my_list, columns= ["A", "B", "C"])
ds_ = spark.createDataFrame(my_list,["A", "B", "C"])

In [17]:
dp

Unnamed: 0,A,B,C
0,male,1,
1,female,2,3.0
2,male,3,4.0


In [18]:
ds_.show()

+------+---+----+
|     A|  B|   C|
+------+---+----+
|  male|  1|null|
|female|  2|   3|
|  male|  3|   4|
+------+---+----+



In [19]:
# fill na value
dp.fillna(10)

Unnamed: 0,A,B,C
0,male,1,10.0
1,female,2,3.0
2,male,3,4.0


In [20]:
ds_.fillna(10).show()

+------+---+---+
|     A|  B|  C|
+------+---+---+
|  male|  1| 10|
|female|  2|  3|
|  male|  3|  4|
+------+---+---+



# Replace the Values

In [21]:
# caution: you need to chose specific col
#df.A.replace(['male', 'female'],[1, 0], inplace=True)
print(df.head())
#caution: Mixed type replacements are not supported
ds.na.replace(['male','female'],['1','0']).show()

   Unnamed: 0     TV  radio  newspaper  sales
0           1  230.1   37.8       69.2   22.1
1           2   44.5   39.3       45.1   10.4
2           3   17.2   45.9       69.3    9.3
3           4  151.5   41.3       58.5   18.5
4           5  180.8   10.8       58.4   12.9
+---+-----+-----+---------+-----+
|_c0|   TV|radio|newspaper|sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
|  6|  8.7| 48.9|     75.0|  7.2|
|  7| 57.5| 32.8|     23.5| 11.8|
|  8|120.2| 19.6|     11.6| 13.2|
|  9|  8.6|  2.1|      1.0|  4.8|
| 10|199.8|  2.6|     21.2| 10.6|
| 11| 66.1|  5.8|     24.2|  8.6|
| 12|214.7| 24.0|      4.0| 17.4|
| 13| 23.8| 35.1|     65.9|  9.2|
| 14| 97.5|  7.6|      7.2|  9.7|
| 15|204.1| 32.9|     46.0| 19.0|
| 16|195.4| 47.7|     52.9| 22.4|
| 17| 67.8| 36.6|    114.0| 12.5|
| 18|281.4| 39.6|     55.8| 24.4|
| 19| 69.2

# Rename the Columns

In [22]:
df.columns = ["a", "b", "c", "d","e"] # New Column Name
df.head()

Unnamed: 0,a,b,c,d,e
0,1,230.1,37.8,69.2,22.1
1,2,44.5,39.3,45.1,10.4
2,3,17.2,45.9,69.3,9.3
3,4,151.5,41.3,58.5,18.5
4,5,180.8,10.8,58.4,12.9


In [23]:
ds.toDF("a","b","c","d","e").show()

+---+-----+----+-----+----+
|  a|    b|   c|    d|   e|
+---+-----+----+-----+----+
|  1|230.1|37.8| 69.2|22.1|
|  2| 44.5|39.3| 45.1|10.4|
|  3| 17.2|45.9| 69.3| 9.3|
|  4|151.5|41.3| 58.5|18.5|
|  5|180.8|10.8| 58.4|12.9|
|  6|  8.7|48.9| 75.0| 7.2|
|  7| 57.5|32.8| 23.5|11.8|
|  8|120.2|19.6| 11.6|13.2|
|  9|  8.6| 2.1|  1.0| 4.8|
| 10|199.8| 2.6| 21.2|10.6|
| 11| 66.1| 5.8| 24.2| 8.6|
| 12|214.7|24.0|  4.0|17.4|
| 13| 23.8|35.1| 65.9| 9.2|
| 14| 97.5| 7.6|  7.2| 9.7|
| 15|204.1|32.9| 46.0|19.0|
| 16|195.4|47.7| 52.9|22.4|
| 17| 67.8|36.6|114.0|12.5|
| 18|281.4|39.6| 55.8|24.4|
| 19| 69.2|20.5| 18.3|11.3|
| 20|147.3|23.9| 19.1|14.6|
+---+-----+----+-----+----+
only showing top 20 rows



# Rename One Or More Columns

In [24]:
mapping = {"b":"b_changed", "Sales":"D"}

In [25]:
df.rename(columns=mapping).head()

Unnamed: 0,a,b_changed,c,d,e
0,1,230.1,37.8,69.2,22.1
1,2,44.5,39.3,45.1,10.4
2,3,17.2,45.9,69.3,9.3
3,4,151.5,41.3,58.5,18.5
4,5,180.8,10.8,58.4,12.9


In [26]:
new_names = [mapping.get(col, col) for col in ds.columns]
new_names

['_c0', 'TV', 'radio', 'newspaper', 'sales']

In [27]:
ds.toDF(*new_names).show()

+---+-----+-----+---------+-----+
|_c0|   TV|radio|newspaper|sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
|  6|  8.7| 48.9|     75.0|  7.2|
|  7| 57.5| 32.8|     23.5| 11.8|
|  8|120.2| 19.6|     11.6| 13.2|
|  9|  8.6|  2.1|      1.0|  4.8|
| 10|199.8|  2.6|     21.2| 10.6|
| 11| 66.1|  5.8|     24.2|  8.6|
| 12|214.7| 24.0|      4.0| 17.4|
| 13| 23.8| 35.1|     65.9|  9.2|
| 14| 97.5|  7.6|      7.2|  9.7|
| 15|204.1| 32.9|     46.0| 19.0|
| 16|195.4| 47.7|     52.9| 22.4|
| 17| 67.8| 36.6|    114.0| 12.5|
| 18|281.4| 39.6|     55.8| 24.4|
| 19| 69.2| 20.5|     18.3| 11.3|
| 20|147.3| 23.9|     19.1| 14.6|
+---+-----+-----+---------+-----+
only showing top 20 rows



# Drop Columns

In [28]:
df.columns = new_names

In [29]:
drop_name = ["newspaper", "sales"]

In [30]:
df.drop(drop_name, axis=1).head()

Unnamed: 0,_c0,TV,radio
0,1,230.1,37.8
1,2,44.5,39.3
2,3,17.2,45.9
3,4,151.5,41.3
4,5,180.8,10.8


In [31]:
ds.drop(*drop_name).show() # * is used if Certain Column needed to drop
# Not All

+---+-----+-----+
|_c0|   TV|radio|
+---+-----+-----+
|  1|230.1| 37.8|
|  2| 44.5| 39.3|
|  3| 17.2| 45.9|
|  4|151.5| 41.3|
|  5|180.8| 10.8|
|  6|  8.7| 48.9|
|  7| 57.5| 32.8|
|  8|120.2| 19.6|
|  9|  8.6|  2.1|
| 10|199.8|  2.6|
| 11| 66.1|  5.8|
| 12|214.7| 24.0|
| 13| 23.8| 35.1|
| 14| 97.5|  7.6|
| 15|204.1| 32.9|
| 16|195.4| 47.7|
| 17| 67.8| 36.6|
| 18|281.4| 39.6|
| 19| 69.2| 20.5|
| 20|147.3| 23.9|
+---+-----+-----+
only showing top 20 rows



# Filter

In [32]:
df = pd.read_csv("./datas/Advertising.csv")

In [33]:
ds = spark.read.csv("./datas/Advertising.csv", header= True, inferSchema= True)

In [34]:
ds.show()

+---+-----+-----+---------+-----+
|_c0|   TV|radio|newspaper|sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
|  6|  8.7| 48.9|     75.0|  7.2|
|  7| 57.5| 32.8|     23.5| 11.8|
|  8|120.2| 19.6|     11.6| 13.2|
|  9|  8.6|  2.1|      1.0|  4.8|
| 10|199.8|  2.6|     21.2| 10.6|
| 11| 66.1|  5.8|     24.2|  8.6|
| 12|214.7| 24.0|      4.0| 17.4|
| 13| 23.8| 35.1|     65.9|  9.2|
| 14| 97.5|  7.6|      7.2|  9.7|
| 15|204.1| 32.9|     46.0| 19.0|
| 16|195.4| 47.7|     52.9| 22.4|
| 17| 67.8| 36.6|    114.0| 12.5|
| 18|281.4| 39.6|     55.8| 24.4|
| 19| 69.2| 20.5|     18.3| 11.3|
| 20|147.3| 23.9|     19.1| 14.6|
+---+-----+-----+---------+-----+
only showing top 20 rows



In [35]:
df[df.newspaper <20].head()

Unnamed: 0.1,Unnamed: 0,TV,radio,newspaper,sales
7,8,120.2,19.6,11.6,13.2
8,9,8.6,2.1,1.0,4.8
11,12,214.7,24.0,4.0,17.4
13,14,97.5,7.6,7.2,9.7
18,19,69.2,20.5,18.3,11.3


In [36]:
ds[ds.newspaper <20].show()

+---+-----+-----+---------+-----+
|_c0|   TV|radio|newspaper|sales|
+---+-----+-----+---------+-----+
|  8|120.2| 19.6|     11.6| 13.2|
|  9|  8.6|  2.1|      1.0|  4.8|
| 12|214.7| 24.0|      4.0| 17.4|
| 14| 97.5|  7.6|      7.2|  9.7|
| 19| 69.2| 20.5|     18.3| 11.3|
| 20|147.3| 23.9|     19.1| 14.6|
| 25| 62.3| 12.6|     18.3|  9.7|
| 26|262.9|  3.5|     19.5| 12.0|
| 27|142.9| 29.3|     12.6| 15.0|
| 34|265.6| 20.0|      0.3| 17.4|
| 35| 95.7|  1.4|      7.4|  9.5|
| 36|290.7|  4.1|      8.5| 12.8|
| 37|266.9| 43.8|      5.0| 25.4|
| 43|293.6| 27.7|      1.8| 20.7|
| 48|239.9| 41.5|     18.5| 23.2|
| 52|100.4|  9.6|      3.6| 10.7|
| 55|262.7| 28.8|     15.9| 20.2|
| 58|136.2| 19.2|     16.6| 13.2|
| 60|210.7| 29.5|      9.3| 18.4|
| 64|102.7| 29.6|      8.4| 14.0|
+---+-----+-----+---------+-----+
only showing top 20 rows



# Comparison

In [37]:
df[(df.newspaper<20) & (df.TV > 100)].head()

Unnamed: 0.1,Unnamed: 0,TV,radio,newspaper,sales
7,8,120.2,19.6,11.6,13.2
11,12,214.7,24.0,4.0,17.4
19,20,147.3,23.9,19.1,14.6
25,26,262.9,3.5,19.5,12.0
26,27,142.9,29.3,12.6,15.0


In [38]:
ds[(ds.newspaper < 20) & (ds.TV >100)].show(5)

+---+-----+-----+---------+-----+
|_c0|   TV|radio|newspaper|sales|
+---+-----+-----+---------+-----+
|  8|120.2| 19.6|     11.6| 13.2|
| 12|214.7| 24.0|      4.0| 17.4|
| 20|147.3| 23.9|     19.1| 14.6|
| 26|262.9|  3.5|     19.5| 12.0|
| 27|142.9| 29.3|     12.6| 15.0|
+---+-----+-----+---------+-----+
only showing top 5 rows



# With New Column

In [39]:
df["TV_norm"] = df.TV/ sum(df.TV)
df.head()

Unnamed: 0.1,Unnamed: 0,TV,radio,newspaper,sales,TV_norm
0,1,230.1,37.8,69.2,22.1,0.007824
1,2,44.5,39.3,45.1,10.4,0.001513
2,3,17.2,45.9,69.3,9.3,0.000585
3,4,151.5,41.3,58.5,18.5,0.005152
4,5,180.8,10.8,58.4,12.9,0.006148


In [46]:
ds.withColumn("TV_norm",\
              ds.TV/ds.groupBy().agg(F.sum("TV")).collect()[0][0])\
            .show()

+---+-----+-----+---------+-----+--------------------+
|_c0|   TV|radio|newspaper|sales|             TV_norm|
+---+-----+-----+---------+-----+--------------------+
|  1|230.1| 37.8|     69.2| 22.1|0.007824268493802813|
|  2| 44.5| 39.3|     45.1| 10.4|0.001513167961643...|
|  3| 17.2| 45.9|     69.3|  9.3|5.848649200061207E-4|
|  4|151.5| 41.3|     58.5| 18.5|0.005151571824472517|
|  5|180.8| 10.8|     58.4| 12.9|0.006147882414948061|
|  6|  8.7| 48.9|     75.0|  7.2|2.958328374449564E-4|
|  7| 57.5| 32.8|     23.5| 11.8|0.001955217029090...|
|  8|120.2| 19.6|     11.6| 13.2|0.004087253685159...|
|  9|  8.6|  2.1|      1.0|  4.8|2.924324600030603...|
| 10|199.8|  2.6|     21.2| 10.6| 0.00679395412890831|
| 11| 66.1|  5.8|     24.2|  8.6|0.002247649489093...|
| 12|214.7| 24.0|      4.0| 17.4|0.007300610367750821|
| 13| 23.8| 35.1|     65.9|  9.2|8.092898311712601E-4|
| 14| 97.5|  7.6|      7.2|  9.7|0.003315368005848...|
| 15|204.1| 32.9|     46.0| 19.0| 0.00694017035890984|
| 16|195.4

In [47]:
df.head(2)

Unnamed: 0.1,Unnamed: 0,TV,radio,newspaper,sales,TV_norm
0,1,230.1,37.8,69.2,22.1,0.007824
1,2,44.5,39.3,45.1,10.4,0.001513


In [49]:
# Create a new column in Panda Data Frame "cond" with the Condition below
# 1 if (tv >100) & Radio <40
# 2 if Sales >10 
# else 10

df["cond"] = df.apply(lambda c : 1 if ((c.TV >100) & (c.radio < 40))\
                      else 2 if c.sales >10 \
                      else 3 , axis =1 )

In [50]:
df.head()

Unnamed: 0.1,Unnamed: 0,TV,radio,newspaper,sales,TV_norm,cond
0,1,230.1,37.8,69.2,22.1,0.007824,1
1,2,44.5,39.3,45.1,10.4,0.001513,2
2,3,17.2,45.9,69.3,9.3,0.000585,3
3,4,151.5,41.3,58.5,18.5,0.005152,2
4,5,180.8,10.8,58.4,12.9,0.006148,1


In [53]:
# Doing same Condition in Py spark
ds.withColumn("cond", F.when((ds.TV>100) & (ds.radio < 40),1)\
             .when(ds.sales >10, 2)\
             .otherwise(3)).show()

# For Small Data Set Py Spark is time Consuming
# Fast for Big data sets

+---+-----+-----+---------+-----+----+
|_c0|   TV|radio|newspaper|sales|cond|
+---+-----+-----+---------+-----+----+
|  1|230.1| 37.8|     69.2| 22.1|   1|
|  2| 44.5| 39.3|     45.1| 10.4|   2|
|  3| 17.2| 45.9|     69.3|  9.3|   3|
|  4|151.5| 41.3|     58.5| 18.5|   2|
|  5|180.8| 10.8|     58.4| 12.9|   1|
|  6|  8.7| 48.9|     75.0|  7.2|   3|
|  7| 57.5| 32.8|     23.5| 11.8|   2|
|  8|120.2| 19.6|     11.6| 13.2|   1|
|  9|  8.6|  2.1|      1.0|  4.8|   3|
| 10|199.8|  2.6|     21.2| 10.6|   1|
| 11| 66.1|  5.8|     24.2|  8.6|   3|
| 12|214.7| 24.0|      4.0| 17.4|   1|
| 13| 23.8| 35.1|     65.9|  9.2|   3|
| 14| 97.5|  7.6|      7.2|  9.7|   3|
| 15|204.1| 32.9|     46.0| 19.0|   1|
| 16|195.4| 47.7|     52.9| 22.4|   2|
| 17| 67.8| 36.6|    114.0| 12.5|   2|
| 18|281.4| 39.6|     55.8| 24.4|   1|
| 19| 69.2| 20.5|     18.3| 11.3|   2|
| 20|147.3| 23.9|     19.1| 14.6|   1|
+---+-----+-----+---------+-----+----+
only showing top 20 rows



In [58]:
# Create a New Column "log_tv"  with corresponding to log(TV)
df["log_tv"] = np.log(df["TV"])
df.head(3)

Unnamed: 0.1,Unnamed: 0,TV,radio,newspaper,sales,TV_norm,cond,log_tv
0,1,230.1,37.8,69.2,22.1,0.007824,1,5.438514
1,2,44.5,39.3,45.1,10.4,0.001513,2,3.795489
2,3,17.2,45.9,69.3,9.3,0.000585,3,2.844909


In [59]:
# Same Thing With Py Spark
ds.withColumn("log_tv", F.log(ds.TV)).show()

+---+-----+-----+---------+-----+------------------+
|_c0|   TV|radio|newspaper|sales|            log_tv|
+---+-----+-----+---------+-----+------------------+
|  1|230.1| 37.8|     69.2| 22.1|  5.43851399704132|
|  2| 44.5| 39.3|     45.1| 10.4|3.7954891891721947|
|  3| 17.2| 45.9|     69.3|  9.3|2.8449093838194073|
|  4|151.5| 41.3|     58.5| 18.5| 5.020585624949423|
|  5|180.8| 10.8|     58.4| 12.9|5.1973914479580765|
|  6|  8.7| 48.9|     75.0|  7.2| 2.163323025660538|
|  7| 57.5| 32.8|     23.5| 11.8| 4.051784947803305|
|  8|120.2| 19.6|     11.6| 13.2| 4.789157022101107|
|  9|  8.6|  2.1|      1.0|  4.8| 2.151762203259462|
| 10|199.8|  2.6|     21.2| 10.6| 5.297316866214453|
| 11| 66.1|  5.8|     24.2|  8.6| 4.191168746857641|
| 12|214.7| 24.0|      4.0| 17.4| 5.369241704884735|
| 13| 23.8| 35.1|     65.9|  9.2| 3.169685580677429|
| 14| 97.5|  7.6|      7.2|  9.7| 4.579852378003801|
| 15|204.1| 32.9|     46.0| 19.0| 5.318610069815799|
| 16|195.4| 47.7|     52.9| 22.4| 5.2750487396

In [60]:
# Create a new Column "tv+10" 

In [63]:
df["tv+10"] = df["TV"] + 10
df.head(3)

Unnamed: 0.1,Unnamed: 0,TV,radio,newspaper,sales,TV_norm,cond,log_tv,tv+10
0,1,230.1,37.8,69.2,22.1,0.007824,1,5.438514,240.1
1,2,44.5,39.3,45.1,10.4,0.001513,2,3.795489,54.5
2,3,17.2,45.9,69.3,9.3,0.000585,3,2.844909,27.2


In [64]:
ds.withColumn("tv+10", ds["TV"]+10).show(5)

+---+-----+-----+---------+-----+-----+
|_c0|   TV|radio|newspaper|sales|tv+10|
+---+-----+-----+---------+-----+-----+
|  1|230.1| 37.8|     69.2| 22.1|240.1|
|  2| 44.5| 39.3|     45.1| 10.4| 54.5|
|  3| 17.2| 45.9|     69.3|  9.3| 27.2|
|  4|151.5| 41.3|     58.5| 18.5|161.5|
|  5|180.8| 10.8|     58.4| 12.9|190.8|
+---+-----+-----+---------+-----+-----+
only showing top 5 rows



# Join Functions

In [68]:
left_pd = pd.DataFrame({'A': ['A0', 'A1', 'A2', 'A3'],
'B': ['B0', 'B1', 'B2', 'B3'],
'C': ['C0', 'C1', 'C2', 'C3'],
'D': ['D0', 'D1', 'D2', 'D3']},
index=[0, 1, 2, 3])
left_pd

Unnamed: 0,A,B,C,D
0,A0,B0,C0,D0
1,A1,B1,C1,D1
2,A2,B2,C2,D2
3,A3,B3,C3,D3


In [69]:
right_pd = pd.DataFrame({'A': ['A0', 'A1', 'A6', 'A7'],
'F': ['B4', 'B5', 'B6', 'B7'],
'G': ['C4', 'C5', 'C6', 'C7'],
'H': ['D4', 'D5', 'D6', 'D7']},
index=[4, 5, 6, 7])
right_pd

Unnamed: 0,A,F,G,H
4,A0,B4,C4,D4
5,A1,B5,C5,D5
6,A6,B6,C6,D6
7,A7,B7,C7,D7


In [70]:
left_sp = spark.createDataFrame(left_pd)
right_sp = spark.createDataFrame(right_pd)

In [73]:
left_sp.show()

+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
| A0| B0| C0| D0|
| A1| B1| C1| D1|
| A2| B2| C2| D2|
| A3| B3| C3| D3|
+---+---+---+---+



In [74]:
right_sp.show()

+---+---+---+---+
|  A|  F|  G|  H|
+---+---+---+---+
| A0| B4| C4| D4|
| A1| B5| C5| D5|
| A6| B6| C6| D6|
| A7| B7| C7| D7|
+---+---+---+---+



#### Left Join

In [80]:
left_pd.merge(right_pd, on = "A", how = "right")

Unnamed: 0,A,B,C,D,F,G,H
0,A0,B0,C0,D0,B4,C4,D4
1,A1,B1,C1,D1,B5,C5,D5
2,A6,,,,B6,C6,D6
3,A7,,,,B7,C7,D7


In [82]:
left_sp.join(right_sp, on = "A", how = "left")\
        .orderBy("A", ascending = True).show()

+---+---+---+---+----+----+----+
|  A|  B|  C|  D|   F|   G|   H|
+---+---+---+---+----+----+----+
| A0| B0| C0| D0|  B4|  C4|  D4|
| A1| B1| C1| D1|  B5|  C5|  D5|
| A2| B2| C2| D2|null|null|null|
| A3| B3| C3| D3|null|null|null|
+---+---+---+---+----+----+----+



#### Right Join

In [84]:

left_df.merge(right_df, on = "A", how = "right")

Unnamed: 0,A,B,C,D,F,G,H
0,A0,B0,C0,D0,B4,C4,D4
1,A1,B1,C1,D1,B5,C5,D5
2,A6,,,,B6,C6,D6
3,A7,,,,B7,C7,D7


In [86]:
# right join
left_sp.join(right_sp, on = "A", how = "right")\
    .orderBy("A", ascending = True).show()

+---+----+----+----+---+---+---+
|  A|   B|   C|   D|  F|  G|  H|
+---+----+----+----+---+---+---+
| A0|  B0|  C0|  D0| B4| C4| D4|
| A1|  B1|  C1|  D1| B5| C5| D5|
| A6|null|null|null| B6| C6| D6|
| A7|null|null|null| B7| C7| D7|
+---+----+----+----+---+---+---+



### Inner Join

In [87]:
left_df.merge(right_df, on = "A", how = "inner") # Intersects Only

Unnamed: 0,A,B,C,D,F,G,H
0,A0,B0,C0,D0,B4,C4,D4
1,A1,B1,C1,D1,B5,C5,D5


In [90]:
left_sp.join(right_sp, on = "A", how = "inner")\
    .orderBy("A", ascending = True).show()

+---+---+---+---+---+---+---+
|  A|  B|  C|  D|  F|  G|  H|
+---+---+---+---+---+---+---+
| A0| B0| C0| D0| B4| C4| D4|
| A1| B1| C1| D1| B5| C5| D5|
+---+---+---+---+---+---+---+



#### Full Join

In [91]:
left_df.merge(right_df, on = "A", how = "outer")

Unnamed: 0,A,B,C,D,F,G,H
0,A0,B0,C0,D0,B4,C4,D4
1,A1,B1,C1,D1,B5,C5,D5
2,A2,B2,C2,D2,,,
3,A3,B3,C3,D3,,,
4,A6,,,,B6,C6,D6
5,A7,,,,B7,C7,D7


In [92]:
left_sp.join(right_sp, on = "A", how = "full")\
        .orderBy("A", ascending = True).show()

+---+----+----+----+----+----+----+
|  A|   B|   C|   D|   F|   G|   H|
+---+----+----+----+----+----+----+
| A0|  B0|  C0|  D0|  B4|  C4|  D4|
| A1|  B1|  C1|  D1|  B5|  C5|  D5|
| A2|  B2|  C2|  D2|null|null|null|
| A3|  B3|  C3|  D3|null|null|null|
| A6|null|null|null|  B6|  C6|  D6|
| A7|null|null|null|  B7|  C7|  D7|
+---+----+----+----+----+----+----+



## Concat Columns

In [94]:
my_list = [("a", 2,3),
          ("b",5,6),
          ("C",8,9),
          ("a",3,6),
          ("b",3,4),
          ("m",3,8),
          ("k",5,3)]
col_name = ["Col1", "Col2", "Col3"]

df = pd.DataFrame(my_list, columns=col_name)

In [96]:
ds = spark.createDataFrame(my_list, schema= col_name)

In [98]:
df

Unnamed: 0,Col1,Col2,Col3
0,a,2,3
1,b,5,6
2,C,8,9
3,a,3,6
4,b,3,4
5,m,3,8
6,k,5,3


In [101]:
ds.show()

+----+----+----+
|Col1|Col2|Col3|
+----+----+----+
|   a|   2|   3|
|   b|   5|   6|
|   C|   8|   9|
|   a|   3|   6|
|   b|   3|   4|
|   m|   3|   8|
|   k|   5|   3|
+----+----+----+



In [103]:
df["Concat"]= df.apply(lambda x : "%s%s"%(x["Col1"],x["Col2"]), axis = 1)
df

Unnamed: 0,Col1,Col2,Col3,Concat
0,a,2,3,a2
1,b,5,6,b5
2,C,8,9,C8
3,a,3,6,a3
4,b,3,4,b3
5,m,3,8,m3
6,k,5,3,k5


In [104]:
ds.withColumn('Concat', F.concat("col1", "col2")).show()

+----+----+----+------+
|Col1|Col2|Col3|Concat|
+----+----+----+------+
|   a|   2|   3|    a2|
|   b|   5|   6|    b5|
|   C|   8|   9|    C8|
|   a|   3|   6|    a3|
|   b|   3|   4|    b3|
|   m|   3|   8|    m3|
|   k|   5|   3|    k5|
+----+----+----+------+



# GroupBy

In [107]:
df.groupby(["Col1"]).agg({"Col2":"min", "Col3": "mean"})

Unnamed: 0_level_0,Col2,Col3
Col1,Unnamed: 1_level_1,Unnamed: 2_level_1
C,8,9.0
a,2,4.5
b,3,5.0
k,5,3.0
m,3,8.0


In [108]:
ds.groupBy(["Col1"]).agg({"Col2" : "min", "Col3": "avg"}).show()

+----+---------+---------+
|Col1|min(Col2)|avg(Col3)|
+----+---------+---------+
|   m|        3|      8.0|
|   k|        5|      3.0|
|   C|        8|      9.0|
|   b|        3|      5.0|
|   a|        2|      4.5|
+----+---------+---------+



# Pivot

In [110]:
pd.pivot_table(df, values = "Col3", index = "Col1", columns= "Col2",\
              aggfunc=np.sum)

Col2,2,3,5,8
Col1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
C,,,,9.0
a,3.0,6.0,,
b,,4.0,6.0,
k,,,3.0,
m,,8.0,,


In [111]:
ds.groupBy(["Col1"]).pivot("col2").sum("col3").show()

+----+----+----+----+----+
|Col1|   2|   3|   5|   8|
+----+----+----+----+----+
|   m|null|   8|null|null|
|   k|null|null|   3|null|
|   C|null|null|null|   9|
|   b|null|   4|   6|null|
|   a|   3|   6|null|null|
+----+----+----+----+----+



# Window

In [113]:
d = {'A':['a','b','c','d'],'B':['m','m','n','n'],'C':[1,2,3,6]}
dp = pd.DataFrame(d)
ds = spark.createDataFrame(dp)

In [117]:
dp["rank"] = dp.groupby("B")["C"].rank("dense", ascending = False)
display(dp)

from pyspark.sql.window import Window
w = Window.partitionBy("B").orderBy(ds.C.desc())
ds = ds.withColumn("rank", F.rank().over(w))
ds.show()

Unnamed: 0,A,B,C,rank
0,a,m,1,2.0
1,b,m,2,1.0
2,c,n,3,2.0
3,d,n,6,1.0


+---+---+---+----+
|  A|  B|  C|rank|
+---+---+---+----+
|  b|  m|  2|   1|
|  a|  m|  1|   2|
|  d|  n|  6|   1|
|  c|  n|  3|   2|
+---+---+---+----+



# rank Vs dense_rank

In [119]:
d ={'Id':[1,2,3,4,5,6],
'Score': [4.00, 4.00, 3.85, 3.65, 3.65, 3.50]}

In [120]:
data = pd.DataFrame(d)

In [122]:
dp = data.copy()
ds = spark.createDataFrame(dp)

In [123]:
dp["Rank_dense"] = dp["Score"].rank(method = "dense", ascending = False)
dp["Rank"] = dp["Score"].rank(method = "min", ascending = False)
dp

Unnamed: 0,Id,Score,Rank_dense,Rank
0,1,4.0,1.0,1.0
1,2,4.0,1.0,1.0
2,3,3.85,2.0,3.0
3,4,3.65,3.0,4.0
4,5,3.65,3.0,4.0
5,6,3.5,4.0,6.0


In [124]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

In [125]:
w = Window.orderBy(ds.Score.desc())
ds = ds.withColumn("Rank_spark_dense", F.dense_rank().over(w))
ds.withColumn("Rank_spark", F.rank().over(w))
ds.show()

+---+-----+----------------+
| Id|Score|Rank_spark_dense|
+---+-----+----------------+
|  1|  4.0|               1|
|  2|  4.0|               1|
|  3| 3.85|               2|
|  4| 3.65|               3|
|  5| 3.65|               3|
|  6|  3.5|               4|
+---+-----+----------------+



# Chanpter 6 Statistics and Linear Algebra

# Chapter 7 DATA Exploration
A journey of a thousand miles begins with a single step

`**describe**` function in pandas and spark will give most of the statistical results, such as min, median, max, quartiles and standard deviation.

In [133]:
df = pd.read_csv("datas/german.data")

In [134]:
df.head()

Unnamed: 0,A11 6 A34 A43 1169 A65 A75 4 A93 A101 4 A121 67 A143 A152 2 A173 1 A192 A201 1
0,A12 48 A32 A43 5951 A61 A73 2 A92 A101 2 A121 ...
1,A14 12 A34 A46 2096 A61 A74 2 A93 A101 3 A121 ...
2,A11 42 A32 A42 7882 A61 A74 2 A93 A103 4 A122 ...
3,A11 24 A33 A40 4870 A61 A73 3 A93 A101 4 A124 ...
4,A14 36 A32 A46 9055 A65 A73 2 A93 A101 4 A124 ...


## 8.2.13 PCA

In [137]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

In [138]:
data = [(Vectors.sparse(5,[(1,1.0),(3,7.0)]),),
       (Vectors.dense([2.0, 0.0, 3.0 , 4.0, 5.0]),),
       (Vectors.dense([4.0, 0.0, 6.0, 7.0]),)]

df = spark.createDataFrame(data, ["features"])

In [139]:
data

[(SparseVector(5, {1: 1.0, 3: 7.0}),),
 (DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]),),
 (DenseVector([4.0, 0.0, 6.0, 7.0]),)]

In [142]:
df.show()

+--------------------+
|            features|
+--------------------+
| (5,[1,3],[1.0,7.0])|
|[2.0,0.0,3.0,4.0,...|
|   [4.0,0.0,6.0,7.0]|
+--------------------+



In [144]:
pca = PCA(k=3, inputCol = "features", outputCol="pcaFeatures")
model = pca.fit(df)

Py4JJavaError: An error occurred while calling o502.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 129.0 failed 1 times, most recent failure: Lost task 1.0 in stage 129.0 (TID 2685, localhost, executor driver): java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch when merging with another summarizer. Expecting 5 but got 4.
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.merge(MultivariateOnlineSummarizer.scala:135)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix$$anonfun$18.apply(RowMatrix.scala:435)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix$$anonfun$18.apply(RowMatrix.scala:435)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:153)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:152)
	at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
	at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:165)
	at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:84)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1143)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1137)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1206)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1182)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeColumnSummaryStatistics(RowMatrix.scala:433)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:348)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponentsAndExplainedVariance(RowMatrix.scala:401)
	at org.apache.spark.mllib.feature.PCA.fit(PCA.scala:53)
	at org.apache.spark.ml.feature.PCA.fit(PCA.scala:99)
	at org.apache.spark.ml.feature.PCA.fit(PCA.scala:70)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch when merging with another summarizer. Expecting 5 but got 4.
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.merge(MultivariateOnlineSummarizer.scala:135)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix$$anonfun$18.apply(RowMatrix.scala:435)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix$$anonfun$18.apply(RowMatrix.scala:435)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:153)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:152)
	at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
	at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:165)
	at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:84)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


In [145]:
result = model.transform(df).select("pcaFeatures")
result.show()

NameError: name 'model' is not defined

# 8.3 Feature Selection

## 8.3.1 LASSO

In [146]:
df = spark.createDataFrame([
    (0,"yes"),
    (1, "Yes"),
    (2, "Yes"),
    (3, "Yes"),
    (4, "No"),
    (5, "No")
    ], ["id", "label"])
df.show()

+---+-----+
| id|label|
+---+-----+
|  0|  yes|
|  1|  Yes|
|  2|  Yes|
|  3|  Yes|
|  4|   No|
|  5|   No|
+---+-----+



## 8.4.1 Calculate Undersampling Ratio

In [147]:
import math
def round_up(n, decimal = 0):
    multiplier = 10 ** decimals
    return math.ceil(n * multiplier)/ multiplier

In [149]:
df.show()

+---+-----+
| id|label|
+---+-----+
|  0|  yes|
|  1|  Yes|
|  2|  Yes|
|  3|  Yes|
|  4|   No|
|  5|   No|
+---+-----+



In [151]:
df = df.dropna()

In [152]:
label_y = df.filter(df.label == "Yes")
label_N = df.filter(df.label == "No")

sample_ratio = round_up(label_N.count() / df.count(),2)
sample_ratio

NameError: name 'decimals' is not defined

## chapter 9 Regression

##### 1.. Set up spark context and SparkSession

In [153]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Python Spark Regression")\
        .config("spark.some.option", "some-value")\
        .getOrCreate()

##### 2. Load the data

In [154]:
# Load the dataset
df = spark.read.format("com.databricks.spark.csv").\
            options(header = "true",\
            inferschema = "true").load("datas/Advertising.csv", header = True)

In [156]:
# Check the data set
df.show(5,True)

+---+-----+-----+---------+-----+
|_c0|   TV|radio|newspaper|sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows



In [157]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- TV: double (nullable = true)
 |-- radio: double (nullable = true)
 |-- newspaper: double (nullable = true)
 |-- sales: double (nullable = true)



In [161]:
df.describe().show()

+-------+------------------+-----------------+------------------+------------------+------------------+
|summary|               _c0|               TV|             radio|         newspaper|             sales|
+-------+------------------+-----------------+------------------+------------------+------------------+
|  count|               200|              200|               200|               200|               200|
|   mean|             100.5|         147.0425|23.264000000000024|30.553999999999995|14.022500000000003|
| stddev|57.879184513951124|85.85423631490805|14.846809176168728| 21.77862083852283| 5.217456565710477|
|    min|                 1|              0.7|               0.0|               0.3|               1.6|
|    max|               200|            296.4|              49.6|             114.0|              27.0|
+-------+------------------+-----------------+------------------+------------------+------------------+



##### Convert data to dense Vector (Feature and label)

In [162]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

In [163]:
df.show(3)

+---+-----+-----+---------+-----+
|_c0|   TV|radio|newspaper|sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
+---+-----+-----+---------+-----+
only showing top 3 rows



In [165]:
# Two ways to build the features and labels
# Method 1  (Good for small Features)
def transData_(row):
    return Row(label = row["sales"],
               features = Vectors.dense([row["TV"],
                                         row["Radio"],
                                         row["Newspaper"]]))

In [166]:
# Method 2 
# Good for large Fetures
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]), r[-1]])\
                    .tpDF(["features", "labels"])