In [1]:
import pyspark

In [2]:
# import Spark Sessiom
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# Create Spark Session
spark = SparkSession.builder.appName("TestSpark").getOrCreate()

df = spark.read.csv('fakefriends.csv')

In [3]:
spark

In [4]:
df = spark.read.csv('fakefriends.csv')

In [5]:
# Show data
df.show()

+---+--------+---+---------------+
|_c0|     _c1|_c2|            _c3|
+---+--------+---+---------------+
| Id|    Name|Age|NumberOfFriends|
|  0|    Will| 33|            385|
|  1|Jean-Luc| 26|              2|
|  2|    Hugh| 55|            221|
|  3|  Deanna| 40|            465|
|  4|   Quark| 68|             21|
|  5|  Weyoun| 59|            318|
|  6|  Gowron| 37|            220|
|  7|    Will| 54|            307|
|  8|  Jadzia| 38|            380|
|  9|    Hugh| 27|            181|
| 10|     Odo| 53|            191|
| 11|     Ben| 57|            372|
| 12|   Keiko| 54|            253|
| 13|Jean-Luc| 56|            444|
| 14|    Hugh| 43|             49|
| 15|     Rom| 36|             49|
| 16|  Weyoun| 22|            323|
| 17|     Odo| 35|             13|
| 18|Jean-Luc| 45|            455|
+---+--------+---+---------------+
only showing top 20 rows



In [6]:
# Take header in account
df = spark.read.option('header', 'true').csv('fakefriends.csv')

In [7]:
df.head(3)

[Row(Id='0', Name='Will', Age='33', NumberOfFriends='385'),
 Row(Id='1', Name='Jean-Luc', Age='26', NumberOfFriends='2'),
 Row(Id='2', Name='Hugh', Age='55', NumberOfFriends='221')]

In [8]:
# See Schema
# You will notice by default all are string.
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- NumberOfFriends: string (nullable = true)



In [9]:
# Infer Schema based on Data
df = spark.read.option('header', 'true').csv('fakefriends.csv', inferSchema=True)
df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- NumberOfFriends: integer (nullable = true)



In [10]:
df.head(3)

[Row(Id=0, Name='Will', Age=33, NumberOfFriends=385),
 Row(Id=1, Name='Jean-Luc', Age=26, NumberOfFriends=2),
 Row(Id=2, Name='Hugh', Age=55, NumberOfFriends=221)]

In [11]:
# Header and InferSchema
df = spark.read.csv('fakefriends.csv', header=True, inferSchema=True)
df.printSchema()
df.head(3)

root
 |-- Id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- NumberOfFriends: integer (nullable = true)



[Row(Id=0, Name='Will', Age=33, NumberOfFriends=385),
 Row(Id=1, Name='Jean-Luc', Age=26, NumberOfFriends=2),
 Row(Id=2, Name='Hugh', Age=55, NumberOfFriends=221)]

In [12]:
# get column names
df.columns

['Id', 'Name', 'Age', 'NumberOfFriends']

In [13]:
# Select only specific column
# Select method returns a new data frame
df.select(['Name', 'Age']).head(3)

[Row(Name='Will', Age=33),
 Row(Name='Jean-Luc', Age=26),
 Row(Name='Hugh', Age=55)]

In [14]:
# Check Column data types
df.dtypes

[('Id', 'int'), ('Name', 'string'), ('Age', 'int'), ('NumberOfFriends', 'int')]

In [15]:
# Describe Dataframe stats
df.describe()

# describe method return another dataframe which can be used for show
df.describe().show()

+-------+------------------+----+------------------+------------------+
|summary|                Id|Name|               Age|   NumberOfFriends|
+-------+------------------+----+------------------+------------------+
|  count|               503| 502|               501|               502|
|   mean|             251.0|null| 43.68263473053892|247.59561752988049|
| stddev|145.34785860135676|null|14.860318901164648|147.67316308161114|
|    min|                 0|Anil|                18|                 1|
|    max|               502|Worf|                69|               499|
+-------+------------------+----+------------------+------------------+



In [16]:
# Add a column - use WithColumn method
df = df.withColumn('Age after 2 years', df['Age'] + 2)
df.head(3)

[Row(Id=0, Name='Will', Age=33, NumberOfFriends=385, Age after 2 years=35),
 Row(Id=1, Name='Jean-Luc', Age=26, NumberOfFriends=2, Age after 2 years=28),
 Row(Id=2, Name='Hugh', Age=55, NumberOfFriends=221, Age after 2 years=57)]

In [17]:
# Drop the column using drop
df = df.drop('Age after 2 years')
df.head(3)

[Row(Id=0, Name='Will', Age=33, NumberOfFriends=385),
 Row(Id=1, Name='Jean-Luc', Age=26, NumberOfFriends=2),
 Row(Id=2, Name='Hugh', Age=55, NumberOfFriends=221)]

In [18]:
# Rename Column
df.withColumnRenamed('Name', 'User Name').head(3)

[Row(Id=0, User Name='Will', Age=33, NumberOfFriends=385),
 Row(Id=1, User Name='Jean-Luc', Age=26, NumberOfFriends=2),
 Row(Id=2, User Name='Hugh', Age=55, NumberOfFriends=221)]

In [19]:
# Drop rows where any value is null
df.na.drop().tail(5)

[Row(Id=495, Name='Data', Age=46, NumberOfFriends=155),
 Row(Id=496, Name='Gowron', Age=39, NumberOfFriends=275),
 Row(Id=497, Name='Lwaxana', Age=34, NumberOfFriends=423),
 Row(Id=498, Name='Jadzia', Age=62, NumberOfFriends=36),
 Row(Id=499, Name='Leeta', Age=62, NumberOfFriends=12)]

In [20]:
# Drop rows where all values are null
df.na.drop(how = 'all').tail(5)

[Row(Id=498, Name='Jadzia', Age=62, NumberOfFriends=36),
 Row(Id=499, Name='Leeta', Age=62, NumberOfFriends=12),
 Row(Id=500, Name=None, Age=31, NumberOfFriends=12),
 Row(Id=501, Name='Anil', Age=None, NumberOfFriends=15),
 Row(Id=502, Name='Suresh', Age=None, NumberOfFriends=None)]

In [21]:
# Drop row if atleast more than 2 null values i.e. threshold is 2
df.na.drop(how = 'any', thresh=1).tail(5)

[Row(Id=498, Name='Jadzia', Age=62, NumberOfFriends=36),
 Row(Id=499, Name='Leeta', Age=62, NumberOfFriends=12),
 Row(Id=500, Name=None, Age=31, NumberOfFriends=12),
 Row(Id=501, Name='Anil', Age=None, NumberOfFriends=15),
 Row(Id=502, Name='Suresh', Age=None, NumberOfFriends=None)]

In [22]:
# Drop row if there is null in specific column/s
df.na.drop(how = 'any', subset=['Name', 'NumberOfFriends']).tail(5)

[Row(Id=496, Name='Gowron', Age=39, NumberOfFriends=275),
 Row(Id=497, Name='Lwaxana', Age=34, NumberOfFriends=423),
 Row(Id=498, Name='Jadzia', Age=62, NumberOfFriends=36),
 Row(Id=499, Name='Leeta', Age=62, NumberOfFriends=12),
 Row(Id=501, Name='Anil', Age=None, NumberOfFriends=15)]

In [23]:
# Filling Missing values for String 
df.na.fill('Misisng', subset=['Name']).tail(5)

[Row(Id=498, Name='Jadzia', Age=62, NumberOfFriends=36),
 Row(Id=499, Name='Leeta', Age=62, NumberOfFriends=12),
 Row(Id=500, Name='Misisng', Age=31, NumberOfFriends=12),
 Row(Id=501, Name='Anil', Age=None, NumberOfFriends=15),
 Row(Id=502, Name='Suresh', Age=None, NumberOfFriends=None)]

In [24]:
# Use imputer funtion for filling numeric values
from pyspark.ml.feature import Imputer
imputer = Imputer(
    inputCols=['Age', 'NumberOfFriends'],
    outputCols=[f"{colname}_imputed" for colname in ['Age', 'NumberOfFriends']]
).setStrategy("mean") # we can also use median instead of mean

# Fit data Frame to Imputer 
imputer.fit(df).transform(df).tail(5)

[Row(Id=498, Name='Jadzia', Age=62, NumberOfFriends=36, Age_imputed=62, NumberOfFriends_imputed=36),
 Row(Id=499, Name='Leeta', Age=62, NumberOfFriends=12, Age_imputed=62, NumberOfFriends_imputed=12),
 Row(Id=500, Name=None, Age=31, NumberOfFriends=12, Age_imputed=31, NumberOfFriends_imputed=12),
 Row(Id=501, Name='Anil', Age=None, NumberOfFriends=15, Age_imputed=43, NumberOfFriends_imputed=15),
 Row(Id=502, Name='Suresh', Age=None, NumberOfFriends=None, Age_imputed=43, NumberOfFriends_imputed=247)]

In [25]:
# Filter
df.filter("age > 65").select(['Name', 'Age']).show()

+--------+---+
|    Name|Age|
+--------+---+
|   Quark| 68|
|     Odo| 67|
|    Hugh| 67|
|   Keiko| 69|
|   Dukat| 67|
|   Quark| 66|
|     Ben| 66|
|   Nerys| 69|
|   Keiko| 69|
|    Morn| 67|
|     Ben| 69|
|Jean-Luc| 68|
|     Rom| 66|
|  Gowron| 67|
|  Kasidy| 67|
|   Nerys| 67|
|  Martok| 68|
|Jean-Luc| 68|
|   Brunt| 67|
|    Morn| 69|
+--------+---+
only showing top 20 rows



In [26]:
# Concatenating multiple conditions. Not of denoted by ~
df.filter( (df['Age'] > 65) & (df['NumberOfFriends'] < 100)).show()

+---+------+---+---------------+
| Id|  Name|Age|NumberOfFriends|
+---+------+---+---------------+
|  4| Quark| 68|             21|
| 62| Keiko| 69|              9|
|116|   Ben| 69|             75|
|233|Gowron| 67|             70|
|249| Nerys| 66|             41|
|254|  Ezri| 67|             79|
|329| Dukat| 67|             35|
|354|Kasidy| 69|             15|
|396| Keiko| 67|             38|
+---+------+---+---------------+



In [27]:
# Aggregations 
df.select(['Age','NumberOfFriends']).groupBy(['Age']).max('NumberOfFriends').show()

+---+--------------------+
|Age|max(NumberOfFriends)|
+---+--------------------+
| 31|                 481|
| 65|                 443|
| 53|                 451|
| 34|                 423|
| 28|                 378|
| 26|                 492|
| 27|                 471|
| 44|                 499|
| 22|                 478|
| 47|                 488|
| 52|                 487|
| 40|                 465|
| 20|                 384|
| 57|                 465|
| 54|                 462|
| 48|                 439|
| 19|                 404|
| 64|                 499|
| 41|                 397|
| 43|                 428|
+---+--------------------+
only showing top 20 rows



In [28]:
# For multiple different types of aggregation, use agg method
df.groupBy(['Age']).agg({'Age': 'count', 'NumberOfFriends': 'sum'}).show()

+---+----------+--------------------+
|Age|count(Age)|sum(NumberOfFriends)|
+---+----------+--------------------+
| 31|         9|                2150|
| 65|         5|                1491|
| 53|         7|                1560|
| 34|         6|                1473|
| 28|        10|                2091|
| 26|        17|                4115|
| 27|         8|                1825|
| 44|        12|                3386|
| 22|         7|                1445|
| 47|         9|                2099|
| 52|        11|                3747|
| 40|        17|                4264|
| 20|         5|                 825|
| 57|        12|                3106|
| 54|        13|                3615|
| 48|        10|                2814|
| 19|        11|                2346|
| 64|        12|                3376|
| 41|         9|                2417|
| 43|         7|                1614|
+---+----------+--------------------+
only showing top 20 rows



## MLlib

In [50]:
# Lets read some data
df_spark = spark.read.csv('EmployeeSalaries.csv', header=True, inferSchema=True)

In [51]:
df_spark.show()

+-------+---+---+----------+--------+
|   Name|Age|Sex|Experience|  Salary|
+-------+---+---+----------+--------+
|   Ajit| 22|  M|         2| 20000.0|
|  Sumit| 34|  M|        12|100000.0|
|   Anil| 54|  M|        22|200000.0|
| Zubair| 27|  M|         5| 40000.0|
|Shubham| 39|  M|        12| 50000.0|
|   Neha| 26|  F|         4| 30000.0|
|  Kapil| 36|  M|        13| 80000.0|
| Prachi| 28|  F|         6| 47000.0|
+-------+---+---+----------+--------+



In [58]:
# Convert Categorifal features to numeric for
from pyspark.ml.feature import StringIndexer
categorical_cols = ['Sex']

indexer = StringIndexer(
    inputCols=categorical_cols,
    outputCols=[f"{col}_index" for col in categorical_cols]
)

df_spark = indexer.fit(df_spark).transform(df_spark)

IllegalArgumentException: requirement failed: Output column Sex_index already exists.

In [59]:
# Rename Sex column and rename sex_indes column
df_spark =df_spark.drop('Sex').withColumnRenamed('Sex_index', 'Sex')
df_spark.show()

+-------+---+----------+--------+---+
|   Name|Age|Experience|  Salary|Sex|
+-------+---+----------+--------+---+
|   Ajit| 22|         2| 20000.0|0.0|
|  Sumit| 34|        12|100000.0|0.0|
|   Anil| 54|        22|200000.0|0.0|
| Zubair| 27|         5| 40000.0|0.0|
|Shubham| 39|        12| 50000.0|0.0|
|   Neha| 26|         4| 30000.0|1.0|
|  Kapil| 36|        13| 80000.0|0.0|
| Prachi| 28|         6| 47000.0|1.0|
+-------+---+----------+--------+---+



In [60]:
## In Spark we have to group the independent features into single column. We do this with Vector Assembler
from pyspark.ml.feature import VectorAssembler
feature_group = VectorAssembler(
    inputCols=['Age', 'Experience', 'Sex'],
    outputCol="Independent Feature"
)

training = feature_group.transform(df_spark)

# Now we need only 2 columns 1 independent feature and other feature we need to predict
training = training.select(["Independent Feature", "Salary"])

training.show()

+-------------------+--------+
|Independent Feature|  Salary|
+-------------------+--------+
|     [22.0,2.0,0.0]| 20000.0|
|    [34.0,12.0,0.0]|100000.0|
|    [54.0,22.0,0.0]|200000.0|
|     [27.0,5.0,0.0]| 40000.0|
|    [39.0,12.0,0.0]| 50000.0|
|     [26.0,4.0,1.0]| 30000.0|
|    [36.0,13.0,0.0]| 80000.0|
|     [28.0,6.0,1.0]| 47000.0|
+-------------------+--------+



In [62]:
# Now that we have data, lets do Train and test split
train_data, test_data = training.randomSplit([0.75, 0.25])

In [63]:
# Now run Linear Regressiuon
from pyspark.ml.regression import LinearRegression
regressor = LinearRegression(featuresCol="Independent Feature", labelCol="Salary").fit(train_data)

# We can see Coeficient for the Regressor
print(regressor.coefficients)

# We can see Intercepts of Regressor
print(regressor.intercept)

[1223.3502538061841,3944.162436549359,38.07106599051356]
-14289.340101504273


In [64]:
# Lets evaluate on test data
test_results = regressor.evaluate(test_data)
test_results.predictions.show()

+-------------------+--------+------------------+
|Independent Feature|  Salary|        prediction|
+-------------------+--------+------------------+
|    [34.0,12.0,0.0]|100000.0| 74634.51776649829|
|    [39.0,12.0,0.0]| 50000.0| 80751.26903552921|
|    [54.0,22.0,0.0]|200000.0|138543.14720811558|
+-------------------+--------+------------------+



