In [53]:
import pyspark
import pandas as pd

In [54]:
# creating a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Practice").getOrCreate()

# creating a data frame, infer schema used to maintain the datatypes
data_spark = spark.read.option('header', 'true').csv("F:\programming\python-development\python-questions\Database\stock_prices.csv", inferSchema=True)
data_spark.show()

+------+------+------+
|  date|prices|salary|
+------+------+------+
| 6-Mar|   310| 56721|
| 7-Mar|   340| 98761|
| 8-Mar|   380|  NULL|
| 9-Mar|   302| 56123|
|10-Mar|   297| 28912|
|11-Mar|   323| 98683|
|  NULL|   324| 26181|
|12-Mar|  NULL|  NULL|
+------+------+------+



In [55]:
# print the schema of the data frame
data_spark.printSchema()

root
 |-- date: string (nullable = true)
 |-- prices: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [59]:
data_spark.columns # list all the columns
data_spark.dtypes # check the dtype of the data frames

data_spark.describe().show() # describing the data frame
# data_spark.select('prices').show() # selecting a columns & returns a data frame
# data_spark.withColumnRenamed('prices', 'old_prices').show() # renaming the columns
# data_spark.drop('new_prices').show() drops the column from the data frame
# data_spark.withColumn('new_prices', data_spark['prices'] * 1.01).show() # create a new column

+-------+------+------------------+------------------+
|summary|  date|            prices|            salary|
+-------+------+------------------+------------------+
|  count|     7|                 7|                 6|
|   mean|  NULL|325.14285714285717|60896.833333333336|
| stddev|  NULL|28.263218567769734|32031.032530448763|
|    min|10-Mar|               297|             26181|
|    max| 9-Mar|               380|             98761|
+-------+------+------------------+------------------+



In [69]:
data_spark.na.drop(how='any', thresh=1, subset=['prices']).show() # drop the rows with missing values, how='any' , how='all'
data_spark.na.fill('Not defined', ['prices', 'salary']).show() # fill the missing values with the given value

+------+------+------+
|  date|prices|salary|
+------+------+------+
| 6-Mar|   310| 56721|
| 7-Mar|   340| 98761|
| 8-Mar|   380|  NULL|
| 9-Mar|   302| 56123|
|10-Mar|   297| 28912|
|11-Mar|   323| 98683|
|  NULL|   324| 26181|
+------+------+------+

+------+------+------+
|  date|prices|salary|
+------+------+------+
| 6-Mar|   310| 56721|
| 7-Mar|   340| 98761|
| 8-Mar|   380|  NULL|
| 9-Mar|   302| 56123|
|10-Mar|   297| 28912|
|11-Mar|   323| 98683|
|  NULL|   324| 26181|
|12-Mar|  NULL|  NULL|
+------+------+------+



In [71]:
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy='mean',
    inputCols=['prices', 'salary'],
    outputCols=['prices_imputed', 'salary_imputed'])

imputer.fit(data_spark).transform(data_spark).show()

+------+------+------+--------------+--------------+
|  date|prices|salary|prices_imputed|salary_imputed|
+------+------+------+--------------+--------------+
| 6-Mar|   310| 56721|           310|         56721|
| 7-Mar|   340| 98761|           340|         98761|
| 8-Mar|   380|  NULL|           380|         60896|
| 9-Mar|   302| 56123|           302|         56123|
|10-Mar|   297| 28912|           297|         28912|
|11-Mar|   323| 98683|           323|         98683|
|  NULL|   324| 26181|           324|         26181|
|12-Mar|  NULL|  NULL|           325|         60896|
+------+------+------+--------------+--------------+



In [83]:
# filter the data frame & ~ is a not operator
data_spark.filter("salary>40000 and salary<60000").select(['date', 'salary']).show()

# group by the data frame and perform the aggregation
data = spark.read.csv("F:\programming\python-development\python-questions\Database\pyspark_practice_data.csv", header=True, inferSchema=True)
data.show()

+-----+------+
| date|salary|
+-----+------+
|6-Mar| 56721|
|9-Mar| 56123|
+-----+------+

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [92]:
# group by the data frame and perform the aggregation
data.groupBy('name').sum().show()
data.groupBy('Departments').sum().show()
data.groupBy('Departments').count().show()

data.agg( {'salary': 'sum'}).show()

+---------+-----------+
|     name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+

+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+

+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+

+-----------+
|sum(salary)|
+-----------+
|      73000|
+-----------+



In [97]:
# Machine Learning 
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

training = spark.read.csv("F:\programming\python-development\python-questions\Database\\advertising-data.csv", header=True, inferSchema=True)
training.printSchema()

root
 |-- TV: double (nullable = true)
 |-- Radio: double (nullable = true)
 |-- Newspaper: double (nullable = true)
 |-- Sales: double (nullable = true)



In [100]:
# [TV, Radio, Newspaper] --> Group them to create a joint independent features
feature = VectorAssembler(inputCols=['TV', 'Radio', 'Newspaper'], outputCol='Independent Features')
output = feature.transform(training)
output.head(5)

[Row(TV=230.1, Radio=37.8, Newspaper=69.2, Sales=22.1, Independent Features=DenseVector([230.1, 37.8, 69.2])),
 Row(TV=44.5, Radio=39.3, Newspaper=45.1, Sales=10.4, Independent Features=DenseVector([44.5, 39.3, 45.1])),
 Row(TV=17.2, Radio=45.9, Newspaper=69.3, Sales=12.0, Independent Features=DenseVector([17.2, 45.9, 69.3])),
 Row(TV=151.5, Radio=41.3, Newspaper=58.5, Sales=16.5, Independent Features=DenseVector([151.5, 41.3, 58.5])),
 Row(TV=180.8, Radio=10.8, Newspaper=58.4, Sales=17.9, Independent Features=DenseVector([180.8, 10.8, 58.4]))]

In [103]:
finalized_data = output.select('Independent Features', 'Sales')
finalized_data.show()

+--------------------+-----+
|Independent Features|Sales|
+--------------------+-----+
|   [230.1,37.8,69.2]| 22.1|
|    [44.5,39.3,45.1]| 10.4|
|    [17.2,45.9,69.3]| 12.0|
|   [151.5,41.3,58.5]| 16.5|
|   [180.8,10.8,58.4]| 17.9|
|     [8.7,48.9,75.0]|  7.2|
|    [57.5,32.8,23.5]| 11.8|
|   [120.2,19.6,11.6]| 13.2|
|       [8.6,2.1,1.0]|  4.8|
|    [199.8,2.6,21.2]| 15.6|
|     [66.1,5.8,24.2]| 12.6|
|    [214.7,24.0,4.0]| 17.4|
|    [23.8,35.1,65.9]|  9.2|
|      [97.5,7.6,7.2]| 13.7|
|   [204.1,32.9,46.0]| 19.0|
|   [195.4,47.7,52.9]| 22.4|
|   [67.8,36.6,114.0]| 12.5|
|   [281.4,39.6,55.8]| 24.4|
|    [69.2,20.5,18.3]| 11.3|
|   [147.3,23.9,19.1]| 14.6|
+--------------------+-----+
only showing top 20 rows



In [106]:
train_data, test_data = finalized_data.randomSplit([0.70, 0.30])
model = LinearRegression(featuresCol='Independent Features', labelCol='Sales')
model = model.fit(train_data)
model.coefficients, model.intercept

(DenseVector([0.055, 0.1051, -0.0006]), 4.606766133922833)

In [110]:
prediction = model.evaluate(test_data)
prediction.predictions.show()

+--------------------+-----+------------------+
|Independent Features|Sales|        prediction|
+--------------------+-----+------------------+
|      [4.1,11.6,5.7]|  3.2| 6.047410369490728|
|      [8.4,27.2,2.1]|  5.7| 7.925128401077178|
|     [13.1,0.4,25.6]|  5.3| 5.353430261567498|
|    [17.9,37.6,21.6]|  8.0| 9.528127075079963|
|     [25.6,39.0,9.3]|  9.5|10.106571201253132|
|     [27.5,1.6,20.7]|  6.9| 6.274830164519072|
|    [36.9,38.6,65.6]| 10.8|10.650865085836086|
|    [38.0,40.3,11.9]| 10.9|10.923728238967449|
|    [44.7,25.8,20.6]| 10.1| 9.763560252751759|
|    [57.5,32.8,23.5]| 11.8|11.201357357441697|
|    [59.6,12.0,43.1]|  9.7| 9.119403050626158|
|     [66.1,5.8,24.2]| 12.6| 8.837545127421357|
|    [66.9,11.7,36.8]|  9.7| 9.493473526110103|
|    [68.4,44.5,35.6]| 13.6|13.022604503674025|
|    [74.7,49.4,45.7]| 14.7|  13.8776441658489|
|     [75.5,10.8,6.0]| 11.9| 9.891430073241242|
|      [97.5,7.6,7.2]| 13.7|10.764881586297147|
|     [100.4,9.6,3.6]| 10.7| 11.13680776

In [111]:
prediction.r2, prediction.meanAbsoluteError, prediction.meanSquaredError

(0.899329964948218, 1.2089819858871158, 2.530603055640235)

In [112]:
# handling categorical variables
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='Departments', outputCol='Departments_index')
output_fixed = indexer.fit(data).transform(data)
output_fixed.show()

+---------+------------+------+-----------------+
|     Name| Departments|salary|Departments_index|
+---------+------------+------+-----------------+
|    Krish|Data Science| 10000|              1.0|
|    Krish|         IOT|  5000|              2.0|
|   Mahesh|    Big Data|  4000|              0.0|
|    Krish|    Big Data|  4000|              0.0|
|   Mahesh|Data Science|  3000|              1.0|
|Sudhanshu|Data Science| 20000|              1.0|
|Sudhanshu|         IOT| 10000|              2.0|
|Sudhanshu|    Big Data|  5000|              0.0|
|    Sunny|Data Science| 10000|              1.0|
|    Sunny|    Big Data|  2000|              0.0|
+---------+------------+------+-----------------+

