## 1. Basics of pyspark dataframe

In [1]:
import pyspark
from pathlib import Path
import pandas as pd

In [2]:
content_root = Path('..')

In [3]:
data = pd.read_csv(content_root.joinpath('data', 'dummydata.csv'))

In [4]:
data

Unnamed: 0,Name,Age,Height
0,Sanjay,26,158
1,Rakshit,26,160
2,Medha,23,150


In [5]:
from pyspark.sql import SparkSession

In [6]:
sc = pyspark.SparkConf()

In [7]:
sc

<pyspark.conf.SparkConf at 0x15fec9190>

In [8]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

21/09/12 20:13:16 WARN Utils: Your hostname, Sanjays-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.37 instead (on interface en0)
21/09/12 20:13:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/09/12 20:13:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [9]:
spark

In [10]:
df_spark = spark.read\
.option('header', 'true')\
.csv(str(content_root.joinpath('data', 'dummydata.csv')))

                                                                                

In [11]:
df_spark

DataFrame[Name: string, Age: string, Height: string]

In [12]:
df_spark.show()

+-------+---+------+
|   Name|Age|Height|
+-------+---+------+
| Sanjay| 26|   158|
|Rakshit| 26|   160|
|  Medha| 23|   150|
+-------+---+------+



In [13]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [14]:
df_spark.head(10)

[Row(Name='Sanjay', Age='26', Height='158'),
 Row(Name='Rakshit', Age='26', Height='160'),
 Row(Name='Medha', Age='23', Height='150')]

In [15]:
df_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Height: string (nullable = true)



In [16]:
## read the dataset

df_spark = spark.read.option('header', 'true').csv('../data/dummydata.csv', inferSchema=True)

In [17]:
df_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Height: integer (nullable = true)



In [18]:
# Selecting a column

from IPython.display import display

df_spark.select('Name').show()

df_spark.select('Name', 'Age').show()

df_spark.show()

+-------+
|   Name|
+-------+
| Sanjay|
|Rakshit|
|  Medha|
+-------+

+-------+---+
|   Name|Age|
+-------+---+
| Sanjay| 26|
|Rakshit| 26|
|  Medha| 23|
+-------+---+

+-------+---+------+
|   Name|Age|Height|
+-------+---+------+
| Sanjay| 26|   158|
|Rakshit| 26|   160|
|  Medha| 23|   150|
+-------+---+------+



In [19]:
# Datatypes

df_spark.dtypes

[('Name', 'string'), ('Age', 'int'), ('Height', 'int')]

In [20]:
# Describe

df_spark.describe()

df_spark.describe().show()

                                                                                

+-------+------+------------------+-----------------+
|summary|  Name|               Age|           Height|
+-------+------+------------------+-----------------+
|  count|     3|                 3|                3|
|   mean|  null|              25.0|            156.0|
| stddev|  null|1.7320508075688772|5.291502622129181|
|    min| Medha|                23|              150|
|    max|Sanjay|                26|              160|
+-------+------+------------------+-----------------+



In [21]:
# Adding cols & dropping cols

df_spark = df_spark.withColumn('Age after 10yr', df_spark['Age']+10)

df_spark.show()

+-------+---+------+--------------+
|   Name|Age|Height|Age after 10yr|
+-------+---+------+--------------+
| Sanjay| 26|   158|            36|
|Rakshit| 26|   160|            36|
|  Medha| 23|   150|            33|
+-------+---+------+--------------+



In [22]:
# Now dropping

df_spark = df_spark.drop('Age after 10yr')

df_spark.show()

+-------+---+------+
|   Name|Age|Height|
+-------+---+------+
| Sanjay| 26|   158|
|Rakshit| 26|   160|
|  Medha| 23|   150|
+-------+---+------+



In [23]:
# Rename the column

df_spark = df_spark.withColumnRenamed('Height', 'Weight')

In [24]:
df_spark.show()

+-------+---+------+
|   Name|Age|Weight|
+-------+---+------+
| Sanjay| 26|   158|
|Rakshit| 26|   160|
|  Medha| 23|   150|
+-------+---+------+



In [25]:
spark.stop()

---

## 2. Some more manipulation

In [26]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [27]:
df_spark = spark.read.csv(
    '../data/dummydata2.csv',
    header=True,
    inferSchema=True
)

df_spark.show()

+----------+---+----------+-------+-----+
|      Name|Age|Experience|Passout|Score|
+----------+---+----------+-------+-----+
|    Sanjay| 26|       3.5|   2017|  0.7|
|     Medha| 23|       0.6|   2020|  0.7|
|Steve Jobs|100|      null|   null|  1.0|
| Elon Musk| 50|      null|   null|  1.0|
|     Montu| 16|      null|   2025|  0.1|
+----------+---+----------+-------+-----+



In [28]:
df_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: double (nullable = true)
 |-- Passout: integer (nullable = true)
 |-- Score: double (nullable = true)



In [29]:
# Removing anything that is na

df_spark.na.drop().show()

df_spark.na.drop(how='all').show()

df_spark.na.drop(how='any').show()

+------+---+----------+-------+-----+
|  Name|Age|Experience|Passout|Score|
+------+---+----------+-------+-----+
|Sanjay| 26|       3.5|   2017|  0.7|
| Medha| 23|       0.6|   2020|  0.7|
+------+---+----------+-------+-----+

+----------+---+----------+-------+-----+
|      Name|Age|Experience|Passout|Score|
+----------+---+----------+-------+-----+
|    Sanjay| 26|       3.5|   2017|  0.7|
|     Medha| 23|       0.6|   2020|  0.7|
|Steve Jobs|100|      null|   null|  1.0|
| Elon Musk| 50|      null|   null|  1.0|
|     Montu| 16|      null|   2025|  0.1|
+----------+---+----------+-------+-----+

+------+---+----------+-------+-----+
|  Name|Age|Experience|Passout|Score|
+------+---+----------+-------+-----+
|Sanjay| 26|       3.5|   2017|  0.7|
| Medha| 23|       0.6|   2020|  0.7|
+------+---+----------+-------+-----+



In [30]:
# Drop with threshold

df_spark.na.drop(how='any', 
                 thresh=4 # Atleast 4 non-null values should be present
                ).show()

+------+---+----------+-------+-----+
|  Name|Age|Experience|Passout|Score|
+------+---+----------+-------+-----+
|Sanjay| 26|       3.5|   2017|  0.7|
| Medha| 23|       0.6|   2020|  0.7|
| Montu| 16|      null|   2025|  0.1|
+------+---+----------+-------+-----+



In [31]:
# Subset in drop

df_spark.na.drop(subset='Passout' # Deletes all null values in the column 'Passout'
                ).show()

+------+---+----------+-------+-----+
|  Name|Age|Experience|Passout|Score|
+------+---+----------+-------+-----+
|Sanjay| 26|       3.5|   2017|  0.7|
| Medha| 23|       0.6|   2020|  0.7|
| Montu| 16|      null|   2025|  0.1|
+------+---+----------+-------+-----+



In [32]:
# fill na

df_spark.na.fill(0, # puts 0 wherever there is null in 'Experience'
                'Experience'
                ).show()


df_spark.na.fill(100, # puts 100 wherever there is null in dataframe which has integer/float dtypes
                ).show()

+----------+---+----------+-------+-----+
|      Name|Age|Experience|Passout|Score|
+----------+---+----------+-------+-----+
|    Sanjay| 26|       3.5|   2017|  0.7|
|     Medha| 23|       0.6|   2020|  0.7|
|Steve Jobs|100|       0.0|   null|  1.0|
| Elon Musk| 50|       0.0|   null|  1.0|
|     Montu| 16|       0.0|   2025|  0.1|
+----------+---+----------+-------+-----+

+----------+---+----------+-------+-----+
|      Name|Age|Experience|Passout|Score|
+----------+---+----------+-------+-----+
|    Sanjay| 26|       3.5|   2017|  0.7|
|     Medha| 23|       0.6|   2020|  0.7|
|Steve Jobs|100|     100.0|    100|  1.0|
| Elon Musk| 50|     100.0|    100|  1.0|
|     Montu| 16|     100.0|   2025|  0.1|
+----------+---+----------+-------+-----+



In [33]:
# Replace with mean

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['Experience', 'Passout'],
    outputCols=[f'{i}_mean' for i in ['Experience', 'Passout']]
    ).setStrategy('mean')

In [34]:
imputer.fit(df_spark).transform(df_spark).show()

+----------+---+----------+-------+-----+---------------+------------+
|      Name|Age|Experience|Passout|Score|Experience_mean|Passout_mean|
+----------+---+----------+-------+-----+---------------+------------+
|    Sanjay| 26|       3.5|   2017|  0.7|            3.5|        2017|
|     Medha| 23|       0.6|   2020|  0.7|            0.6|        2020|
|Steve Jobs|100|      null|   null|  1.0|           2.05|        2020|
| Elon Musk| 50|      null|   null|  1.0|           2.05|        2020|
|     Montu| 16|      null|   2025|  0.1|           2.05|        2025|
+----------+---+----------+-------+-----+---------------+------------+



In [35]:
spark.stop()

---

## 3. Filter and few additional operations

In [36]:
from pyspark.sql import SparkSession

In [37]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [38]:
df_spark = spark.read.csv('../data/dummydata2.csv',
                          header=True,
                          inferSchema=True)
df_spark.show()

+----------+---+----------+-------+-----+
|      Name|Age|Experience|Passout|Score|
+----------+---+----------+-------+-----+
|    Sanjay| 26|       3.5|   2017|  0.7|
|     Medha| 23|       0.6|   2020|  0.7|
|Steve Jobs|100|      null|   null|  1.0|
| Elon Musk| 50|      null|   null|  1.0|
|     Montu| 16|      null|   2025|  0.1|
+----------+---+----------+-------+-----+



In [39]:
# Filter

df_spark.filter('Score<=0.5').show()

df_spark.filter('Score<=0.5').select(['Name', 'Age']).show()

df_spark.filter((df_spark['Score']>=0.5) & 
                 (df_spark['Age']>=30)
                ).show()

+-----+---+----------+-------+-----+
| Name|Age|Experience|Passout|Score|
+-----+---+----------+-------+-----+
|Montu| 16|      null|   2025|  0.1|
+-----+---+----------+-------+-----+

+-----+---+
| Name|Age|
+-----+---+
|Montu| 16|
+-----+---+

+----------+---+----------+-------+-----+
|      Name|Age|Experience|Passout|Score|
+----------+---+----------+-------+-----+
|Steve Jobs|100|      null|   null|  1.0|
| Elon Musk| 50|      null|   null|  1.0|
+----------+---+----------+-------+-----+



In [40]:
spark.stop()

---

## 4. Groupby and aggregate

In [41]:
spark = SparkSession.builder.appName('dfs').getOrCreate()

In [42]:
df_spark = spark.read.csv('../data/dummydata3.csv',
                         header=True,
                         inferSchema=True)

df_spark.show()

+----------+----------+----------+
|      Name|Occupation|Experience|
+----------+----------+----------+
|    Sanjay|  Software|       3.5|
|     Medha| Marketing|       0.6|
| Elon Musk|  Software|      40.0|
|Bill Gates|  Software|      60.0|
|Other Basu| Marketing|     100.0|
|  Person 1|     Sales|       5.0|
|  Person A|     Admin|      10.0|
+----------+----------+----------+



In [43]:
df_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Experience: double (nullable = true)



In [44]:
df_spark.dtypes

[('Name', 'string'), ('Occupation', 'string'), ('Experience', 'double')]

In [45]:
# Groupby

df_spark.groupBy('Occupation').sum('Experience').show() # the sum of experience per group

df_spark.groupBy('Occupation').count().show() # The count of entries per group

df_spark.groupBy('Occupation').max('Experience').show() # The max value of experience per group

df_spark.groupBy('Occupation').min('Experience').show()

+----------+---------------+
|Occupation|sum(Experience)|
+----------+---------------+
|     Sales|            5.0|
|     Admin|           10.0|
| Marketing|          100.6|
|  Software|          103.5|
+----------+---------------+



                                                                                

+----------+-----+
|Occupation|count|
+----------+-----+
|     Sales|    1|
|     Admin|    1|
| Marketing|    2|
|  Software|    3|
+----------+-----+

+----------+---------------+
|Occupation|max(Experience)|
+----------+---------------+
|     Sales|            5.0|
|     Admin|           10.0|
| Marketing|          100.0|
|  Software|           60.0|
+----------+---------------+

+----------+---------------+
|Occupation|min(Experience)|
+----------+---------------+
|     Sales|            5.0|
|     Admin|           10.0|
| Marketing|            0.6|
|  Software|            3.5|
+----------+---------------+



In [46]:
spark.stop()

---

## 5. MLlib

In [47]:
spark = SparkSession.builder.appName('mllib').getOrCreate()

In [48]:
data = spark.read.csv('../data/mllibdata.csv',
                     header=True,
                     inferSchema=True)

data.show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|   Sanjay| 31|        10|300000|
|Sudhanshu| 30|         8|250000|
|    Sunny| 29|         4|200000|
|     Paul| 24|         3|200000|
|   Harsha| 21|         1|150000|
|  Shubham| 23|         2|180000|
+---------+---+----------+------+



In [49]:
data.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [50]:
# We need feature assembler in pyspark to create the features to be used by our algorithm

from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=['Age','Experience'],
                                  outputCol='independent_feature')

In [51]:
output = featureassembler.transform(data)

output.show()

+---------+---+----------+------+-------------------+
|     Name|Age|Experience|Salary|independent_feature|
+---------+---+----------+------+-------------------+
|   Sanjay| 31|        10|300000|        [31.0,10.0]|
|Sudhanshu| 30|         8|250000|         [30.0,8.0]|
|    Sunny| 29|         4|200000|         [29.0,4.0]|
|     Paul| 24|         3|200000|         [24.0,3.0]|
|   Harsha| 21|         1|150000|         [21.0,1.0]|
|  Shubham| 23|         2|180000|         [23.0,2.0]|
+---------+---+----------+------+-------------------+



In [52]:
final_data = output.select('independent_feature', 'Salary')

final_data.show()

+-------------------+------+
|independent_feature|Salary|
+-------------------+------+
|        [31.0,10.0]|300000|
|         [30.0,8.0]|250000|
|         [29.0,4.0]|200000|
|         [24.0,3.0]|200000|
|         [21.0,1.0]|150000|
|         [23.0,2.0]|180000|
+-------------------+------+



In [53]:
train_data, test_data = final_data.randomSplit([0.60, 0.40],
                                               seed=21)
train_data.show()
test_data.show()

+-------------------+------+
|independent_feature|Salary|
+-------------------+------+
|         [23.0,2.0]|180000|
|         [24.0,3.0]|200000|
|         [29.0,4.0]|200000|
|        [31.0,10.0]|300000|
+-------------------+------+

+-------------------+------+
|independent_feature|Salary|
+-------------------+------+
|         [21.0,1.0]|150000|
|         [30.0,8.0]|250000|
+-------------------+------+



In [54]:
# Lets do linear regression

from pyspark.ml.regression import LinearRegression

regr = LinearRegression(featuresCol='independent_feature',
                       labelCol='Salary')
regr = regr.fit(train_data)

21/09/12 20:13:46 WARN Instrumentation: [d7411ad2] regParam is zero, which might cause numerical instability and overfitting.
21/09/12 20:13:46 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/09/12 20:13:46 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
21/09/12 20:13:46 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
21/09/12 20:13:46 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


In [55]:
regr.coefficients

DenseVector([-2982.4561, 17719.2982])

In [56]:
pred_res = regr.evaluate(test_data)

pred_res.predictions.show()

+-------------------+------+------------------+
|independent_feature|Salary|        prediction|
+-------------------+------+------------------+
|         [21.0,1.0]|150000|170701.75438596515|
|         [30.0,8.0]|250000| 267894.7368421052|
+-------------------+------+------------------+



In [57]:
pred_res.meanAbsoluteError, pred_res.rootMeanSquaredError

(19298.245614035186, 19349.214988017284)