In [1]:
import spark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [4]:
df = spark.read.json("C://Users//VPraveenK//Downloads//people.json")

In [5]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [6]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [7]:
df.columns

['age', 'name']

In [8]:
df.describe()

DataFrame[summary: string, age: string, name: string]

In [9]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [10]:
from pyspark.sql.types import StructField,StringType,IntegerType,StructType

In [11]:
data_schema = [StructField('age',IntegerType(),True),
                StructField('name',StringType(),True)]

In [12]:
final_struc = StructType(fields=data_schema)

In [13]:
df = spark.read.json("C://Users//VPraveenK//Downloads//people.json", schema=final_struc)

In [14]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [15]:
type(df['age'])

pyspark.sql.column.Column

In [16]:
type(df.select('age'))

pyspark.sql.dataframe.DataFrame

In [17]:
df.head(2)[0]

Row(age=None, name='Michael')

In [18]:
type(df.head(2)[0])

pyspark.sql.types.Row

In [19]:
df.select(['age','name'])

DataFrame[age: int, name: string]

In [20]:
df.withColumn('double_age', df['age']*2).show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



In [21]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [22]:
df.withColumnRenamed('age', 'new_age').show()

+-------+-------+
|new_age|   name|
+-------+-------+
|   null|Michael|
|     30|   Andy|
|     19| Justin|
+-------+-------+



In [23]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [24]:
df.createOrReplaceTempView('people')

In [25]:
results = spark.sql("SELECT * FROM people")

In [26]:
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [27]:
new_results = spark.sql("SELECT * FROM people WHERE age=30")

In [28]:
new_results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [29]:
spark = SparkSession.builder.appName('ops').getOrCreate()

In [30]:
dfe = spark.read.csv("C://Users//VPraveenK//Downloads//AAPL.csv", inferSchema=True,header=True)
# We dont have inferschema option with json files

In [31]:
dfe.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [32]:
dfe.show()

+-------------------+----------+----------+----------+----------+----------+---------+
|               Date|      Open|      High|       Low|     Close| Adj Close|   Volume|
+-------------------+----------+----------+----------+----------+----------+---------+
|2021-11-29 00:00:00|159.369995|161.190002|158.789993|160.240005|159.315384| 88748200|
|2021-11-30 00:00:00|159.990005|165.520004|159.919998|165.300003|164.346176|174048100|
|2021-12-01 00:00:00|167.479996|170.300003|164.529999|164.770004|163.819244|152052500|
|2021-12-02 00:00:00|158.740005|164.199997|157.800003|163.759995|162.815048|136739200|
|2021-12-03 00:00:00|164.020004|164.960007|159.720001|161.839996|160.906128|118023100|
|2021-12-06 00:00:00|164.289993|167.880005|164.279999|165.320007|164.366058|107497000|
|2021-12-07 00:00:00|169.080002|171.580002|168.339996|171.179993|170.192215|120405400|
|2021-12-08 00:00:00|172.130005|175.960007|170.699997|175.080002|174.069733|116998900|
|2021-12-09 00:00:00|174.910004|    176.75|

In [33]:
dfe.head(3)[0]

Row(Date=datetime.datetime(2021, 11, 29, 0, 0), Open=159.369995, High=161.190002, Low=158.789993, Close=160.240005, Adj Close=159.315384, Volume=88748200)

In [34]:
dfe.filter("Close<500").show()

+-------------------+----------+----------+----------+----------+----------+---------+
|               Date|      Open|      High|       Low|     Close| Adj Close|   Volume|
+-------------------+----------+----------+----------+----------+----------+---------+
|2021-11-29 00:00:00|159.369995|161.190002|158.789993|160.240005|159.315384| 88748200|
|2021-11-30 00:00:00|159.990005|165.520004|159.919998|165.300003|164.346176|174048100|
|2021-12-01 00:00:00|167.479996|170.300003|164.529999|164.770004|163.819244|152052500|
|2021-12-02 00:00:00|158.740005|164.199997|157.800003|163.759995|162.815048|136739200|
|2021-12-03 00:00:00|164.020004|164.960007|159.720001|161.839996|160.906128|118023100|
|2021-12-06 00:00:00|164.289993|167.880005|164.279999|165.320007|164.366058|107497000|
|2021-12-07 00:00:00|169.080002|171.580002|168.339996|171.179993|170.192215|120405400|
|2021-12-08 00:00:00|172.130005|175.960007|170.699997|175.080002|174.069733|116998900|
|2021-12-09 00:00:00|174.910004|    176.75|

In [35]:
dfe.filter("Close<500").select('open','Close').show()

+----------+----------+
|      open|     Close|
+----------+----------+
|159.369995|160.240005|
|159.990005|165.300003|
|167.479996|164.770004|
|158.740005|163.759995|
|164.020004|161.839996|
|164.289993|165.320007|
|169.080002|171.179993|
|172.130005|175.080002|
|174.910004|174.559998|
|175.210007|179.449997|
|181.119995|175.740005|
|    175.25|174.330002|
|175.110001|179.300003|
|179.279999|172.259995|
|169.929993|171.139999|
|168.279999|    169.75|
|171.559998|172.990005|
|173.039993|175.639999|
|175.850006|176.279999|
|177.089996|180.330002|
+----------+----------+
only showing top 20 rows



In [36]:
dfe.filter(dfe['Close'] <500).select('open', 'Close').show()

+----------+----------+
|      open|     Close|
+----------+----------+
|159.369995|160.240005|
|159.990005|165.300003|
|167.479996|164.770004|
|158.740005|163.759995|
|164.020004|161.839996|
|164.289993|165.320007|
|169.080002|171.179993|
|172.130005|175.080002|
|174.910004|174.559998|
|175.210007|179.449997|
|181.119995|175.740005|
|    175.25|174.330002|
|175.110001|179.300003|
|179.279999|172.259995|
|169.929993|171.139999|
|168.279999|    169.75|
|171.559998|172.990005|
|173.039993|175.639999|
|175.850006|176.279999|
|177.089996|180.330002|
+----------+----------+
only showing top 20 rows



In [37]:
dfe.filter(dfe['Close']<200 and dfe['Open']>200).show()
# Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

In [38]:
dfe.filter(dfe['Close']<200 & dfe['Open']>200).show()
# Separate the two conditions with parenthesis 

Py4JError: An error occurred while calling o108.and. Trace:
py4j.Py4JException: Method and([class java.lang.Integer]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)



In [39]:
dfe.filter( (dfe['Close']<150) & (dfe['Open']>100)).show()

+-------------------+----------+----------+----------+----------+----------+---------+
|               Date|      Open|      High|       Low|     Close| Adj Close|   Volume|
+-------------------+----------+----------+----------+----------+----------+---------+
|2022-05-11 00:00:00|     153.5|155.449997|145.809998|     146.5|146.054504|142689800|
|2022-05-12 00:00:00|142.770004|146.199997|138.800003|142.559998| 142.12648|182602000|
|2022-05-13 00:00:00|144.589996|148.100006|143.110001|147.110001|146.662659|113990900|
|2022-05-16 00:00:00|145.550003|147.520004|144.179993|145.539993|145.097427| 86643800|
|2022-05-17 00:00:00|148.860001|149.770004|146.679993|149.240005|148.786179| 78336300|
|2022-05-18 00:00:00|146.850006|147.360001|139.899994|140.820007|140.391785|109742900|
|2022-05-19 00:00:00|139.880005|141.660004|136.600006|137.350006|136.932327|136095600|
|2022-05-20 00:00:00|139.089996|140.699997|132.610001|137.589996|137.171585|137426100|
|2022-05-23 00:00:00|137.789993|143.259995|

In [40]:
result = dfe.filter(dfe['Low'] == 132.610001).collect()

In [41]:
result

[Row(Date=datetime.datetime(2022, 5, 20, 0, 0), Open=139.089996, High=140.699997, Low=132.610001, Close=137.589996, Adj Close=137.171585, Volume=137426100)]

In [42]:
row= result[0]

In [43]:
row.asDict()

{'Date': datetime.datetime(2022, 5, 20, 0, 0),
 'Open': 139.089996,
 'High': 140.699997,
 'Low': 132.610001,
 'Close': 137.589996,
 'Adj Close': 137.171585,
 'Volume': 137426100}

In [44]:
row.asDict()['Volume']

137426100

GroupBy and Aggregate Functions

In [50]:
from pyspark.sql import SparkSession

In [53]:
spark = SparkSession.builder.appName('aggs').getOrCreate()

In [54]:
dfee = spark.read.csv("C://Users//VPraveenK//Downloads//sales_info.csv", inferSchema=True, header=True)

In [55]:
dfee.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [56]:
dfee.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [57]:
dfee.groupBy("Company")

<pyspark.sql.group.GroupedData at 0x11f6348e140>

In [58]:
dfee.groupBy("Company").mean()

DataFrame[Company: string, avg(Sales): double]

In [59]:
dfee.groupBy("Company").mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [60]:
dfee.groupBy("Company").count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [61]:
dfee.agg({'Sales':'max'}).show()

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



In [62]:
group_data = dfee.groupBy('Company')

In [63]:
group_data.agg({'Sales':'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [64]:
from pyspark.sql.functions import countDistinct,avg,stddev

In [65]:
dfee.select(countDistinct('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [66]:
dfee.select(avg('Sales').alias('Average Sales')).show()

+-----------------+
|    Average Sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [67]:
dfee.select(stddev('Sales')).show()

+------------------+
|stddev_samp(Sales)|
+------------------+
|250.08742410799007|
+------------------+



In [68]:
from pyspark.sql.functions import format_number

In [69]:
sales_std = dfee.select(stddev("Sales").alias('std'))

In [70]:
sales_std.select(format_number('std',2)).show()

+---------------------+
|format_number(std, 2)|
+---------------------+
|               250.09|
+---------------------+



In [71]:
sales_std.select(format_number('std',2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



In [72]:
dfee.orderBy("Sales").show() #Ascending list

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [73]:
dfee.orderBy(dfee['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



Missing Data

In [74]:
import spark

In [75]:
from pyspark.sql import SparkSession

In [76]:
spark = SparkSession.builder.appName('miss').getOrCreate()

In [77]:
dfef = spark.read.csv("C://Users//VPraveenK//Documents//ContainsNull.csv",header = True, inferSchema= True)

In [78]:
dfef.show()

+----+-----+-----+
|  id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|  345|
|emp4|Cindy|  456|
+----+-----+-----+



In [79]:
dfef.na.drop().show()

+----+-----+-----+
|  id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|  456|
+----+-----+-----+



In [80]:
dfef.na.drop(thresh=2).show()

+----+-----+-----+
|  id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|  345|
|emp4|Cindy|  456|
+----+-----+-----+



In [81]:
dfef.na.drop(how='any').show()

+----+-----+-----+
|  id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|  456|
+----+-----+-----+



In [82]:
dfef.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  id| Name|Sales|
+----+-----+-----+
|emp3| null|  345|
|emp4|Cindy|  456|
+----+-----+-----+



In [83]:
dfef.printSchema()

root
 |-- id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: integer (nullable = true)



In [84]:
dfef.na.fill(0).show()

+----+-----+-----+
|  id| Name|Sales|
+----+-----+-----+
|emp1| John|    0|
|emp2| null|    0|
|emp3| null|  345|
|emp4|Cindy|  456|
+----+-----+-----+



In [85]:
dfef.na.fill('No Name', subset=['Name']).show()

+----+-------+-----+
|  id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|  345|
|emp4|  Cindy|  456|
+----+-------+-----+



In [86]:
from pyspark.sql.functions import mean

In [87]:
mean_val = dfef.select(mean(dfef['Sales'])).collect()

In [88]:
mean_val

[Row(avg(Sales)=400.5)]

In [89]:
mean_sales = mean_val[0][0]

In [90]:
dfef.na.fill(mean_sales,['Sales']).show()

+----+-----+-----+
|  id| Name|Sales|
+----+-----+-----+
|emp1| John|  400|
|emp2| null|  400|
|emp3| null|  345|
|emp4|Cindy|  456|
+----+-----+-----+



In [91]:
dfef.na.fill(dfef.select(mean(dfef['Sales'])).collect()[0][0],['Sales']).show()

+----+-----+-----+
|  id| Name|Sales|
+----+-----+-----+
|emp1| John|  400|
|emp2| null|  400|
|emp3| null|  345|
|emp4|Cindy|  456|
+----+-----+-----+



Dates and Timestamps

In [92]:
from pyspark.sql import SparkSession

In [93]:
spark = SparkSession.builder.appName('dates').getOrCreate()

In [94]:
deef = spark.read.csv("C://Users//VPraveenK//Downloads//AAPL.csv", inferSchema=True,header=True)

In [95]:
deef.head(1)

[Row(Date=datetime.datetime(2021, 11, 29, 0, 0), Open=159.369995, High=161.190002, Low=158.789993, Close=160.240005, Adj Close=159.315384, Volume=88748200)]

In [96]:
deef.select(['Date','Open']).show()

+-------------------+----------+
|               Date|      Open|
+-------------------+----------+
|2021-11-29 00:00:00|159.369995|
|2021-11-30 00:00:00|159.990005|
|2021-12-01 00:00:00|167.479996|
|2021-12-02 00:00:00|158.740005|
|2021-12-03 00:00:00|164.020004|
|2021-12-06 00:00:00|164.289993|
|2021-12-07 00:00:00|169.080002|
|2021-12-08 00:00:00|172.130005|
|2021-12-09 00:00:00|174.910004|
|2021-12-10 00:00:00|175.210007|
|2021-12-13 00:00:00|181.119995|
|2021-12-14 00:00:00|    175.25|
|2021-12-15 00:00:00|175.110001|
|2021-12-16 00:00:00|179.279999|
|2021-12-17 00:00:00|169.929993|
|2021-12-20 00:00:00|168.279999|
|2021-12-21 00:00:00|171.559998|
|2021-12-22 00:00:00|173.039993|
|2021-12-23 00:00:00|175.850006|
|2021-12-27 00:00:00|177.089996|
+-------------------+----------+
only showing top 20 rows



In [105]:
from pyspark.sql.functions import (dayofmonth,hour,
                                    dayofyear,month,
                                    year,weekofyear,
                                    format_number,date_format)

In [106]:
deef.select(dayofmonth(deef['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|              29|
|              30|
|               1|
|               2|
|               3|
|               6|
|               7|
|               8|
|               9|
|              10|
|              13|
|              14|
|              15|
|              16|
|              17|
|              20|
|              21|
|              22|
|              23|
|              27|
+----------------+
only showing top 20 rows



In [107]:
deef.select(year(deef['Date'])).show()

+----------+
|year(Date)|
+----------+
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
|      2021|
+----------+
only showing top 20 rows



In [108]:
newdeef = deef.withColumn("year", year(deef['Date']))

In [109]:
newdeef.groupBy("year").mean().show()

+----+------------------+------------------+------------------+-----------------+------------------+-------------------+---------+
|year|         avg(Open)|         avg(High)|          avg(Low)|       avg(Close)|    avg(Adj Close)|        avg(Volume)|avg(year)|
+----+------------------+------------------+------------------+-----------------+------------------+-------------------+---------+
|2022|156.38745619736827|158.53872803508767|154.31052614473688|156.4867981973685|155.98794534649124|8.857201578947368E7|   2022.0|
|2021|172.03249995833335|174.60916899999998|       170.2612495|    172.654167125|171.65789912499997|       1.12815125E8|   2021.0|
+----+------------------+------------------+------------------+-----------------+------------------+-------------------+---------+



In [115]:
resultdeef = newdeef.groupBy("year").mean().select(["year","avg(Close)"])
# resultdeef = newdeef.groupBy("year").mean().select(["year","avg(Close)"]).show()
# If we add show() then we get error for below cell 'NoneType' object has no attribute 'withColumnRenamed'

In [117]:
newresult = resultdeef.withColumnRenamed("avg(Close)","Average Closing Price").show()

+----+---------------------+
|year|Average Closing Price|
+----+---------------------+
|2022|    156.4867981973685|
|2021|        172.654167125|
+----+---------------------+



In [127]:
newwresult = resultdeef.select(['year',format_number('Average Closing Price',2).alias("Avg Close")])

AnalysisException: Column '`Average Closing Price`' does not exist. Did you mean one of the following? [avg(Close), year];
'Project [year#1503, format_number('Average Closing Price, 2) AS Avg Close#1767]
+- Project [year#1503, avg(Close)#1707]
   +- Aggregate [year#1503], [year#1503, avg(Open#1251) AS avg(Open)#1704, avg(High#1252) AS avg(High)#1705, avg(Low#1253) AS avg(Low)#1706, avg(Close#1254) AS avg(Close)#1707, avg(Adj Close#1255) AS avg(Adj Close)#1708, avg(Volume#1256) AS avg(Volume)#1709, avg(year#1503) AS avg(year)#1710]
      +- Project [Date#1250, Open#1251, High#1252, Low#1253, Close#1254, Adj Close#1255, Volume#1256, year(cast(Date#1250 as date)) AS year#1503]
         +- Relation [Date#1250,Open#1251,High#1252,Low#1253,Close#1254,Adj Close#1255,Volume#1256] csv


EXERCISE

In [128]:
from pyspark.sql import SparkSession

In [129]:
spark = SparkSession.builder.appName("sol").getOrCreate()

In [130]:
stock = spark.read.csv("C://Users//VPraveenK//Downloads//walmart_stock.csv", inferSchema=True, header=True)

In [131]:
stock.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [132]:
stock.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [133]:
stock.head(5)

[Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996),
 Row(Date=datetime.datetime(2012, 1, 4, 0, 0), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475),
 Row(Date=datetime.datetime(2012, 1, 5, 0, 0), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539),
 Row(Date=datetime.datetime(2012, 1, 6, 0, 0), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922),
 Row(Date=datetime.datetime(2012, 1, 9, 0, 0), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)]

In [134]:
for row in stock.head(5):
    print(row)
    print('\n')

Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996)


Row(Date=datetime.datetime(2012, 1, 4, 0, 0), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475)


Row(Date=datetime.datetime(2012, 1, 5, 0, 0), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539)


Row(Date=datetime.datetime(2012, 1, 6, 0, 0), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922)


Row(Date=datetime.datetime(2012, 1, 9, 0, 0), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)




In [135]:
stock.describe()

DataFrame[summary: string, Open: string, High: string, Low: string, Close: string, Volume: string, Adj Close: string]

In [136]:
stock.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

In [137]:
from pyspark.sql.functions import format_number

In [138]:
result = stock.describe()

In [139]:
result.select(result['summary'],
            format_number(result['Open'].cast('float'),2).alias('Open')).show()

+-------+--------+
|summary|    Open|
+-------+--------+
|  count|1,258.00|
|   mean|   72.36|
| stddev|    6.77|
|    min|   56.39|
|    max|   90.80|
+-------+--------+



In [140]:
stock2 = stock.withColumn("HV Ratio",stock['High']/stock['Volume'])

In [141]:
stock2.select('HV Ratio').show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



In [142]:
stock.orderBy(stock['High'].desc()).show()

+-------------------+-----------------+-----------------+-----------------+-----------------+--------+-----------------+
|               Date|             Open|             High|              Low|            Close|  Volume|        Adj Close|
+-------------------+-----------------+-----------------+-----------------+-----------------+--------+-----------------+
|2015-01-13 00:00:00|        90.800003|        90.970001|            88.93|        89.309998| 8215400|        83.825448|
|2015-01-08 00:00:00|        89.209999|90.66999799999999|            89.07|        90.470001|12713600|84.91421600000001|
|2015-01-09 00:00:00|            90.32|        90.389999|            89.25|        89.349998| 8522500|        83.862993|
|2015-01-12 00:00:00|        89.360001|        90.309998|        89.220001|        90.019997| 7372500|        84.491846|
|2015-01-23 00:00:00|88.41999799999999|        89.260002|        87.889999|        88.510002| 7565800|83.07458100000001|
|2015-01-26 00:00:00|        88.

In [143]:
stock.orderBy(stock['High'].desc()).head(1)[0][0]

datetime.datetime(2015, 1, 13, 0, 0)

In [144]:
from pyspark.sql.functions import mean
stock.select(format_number(mean("Close").cast('float'),2).alias('Average')).show()

+-------+
|Average|
+-------+
|  72.39|
+-------+



In [145]:
from pyspark.sql.functions import max,min
stock.select(max('Volume'),min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



In [146]:
stock.filter('Close < 60').count()

81

In [147]:
stock.filter(stock['Close'] < 60).count()

81

In [148]:
from pyspark.sql.functions import count
result = stock.filter(stock['Close']<60)
result.select(count('Close')).show()

+------------+
|count(Close)|
+------------+
|          81|
+------------+



In [149]:
#What percentage of time was High greater than 80 dollars?
(stock.filter(stock['High']>80).count() / stock.count())*100

9.141494435612083

In [150]:
# Pearson correlation between high and Volume
from pyspark.sql.functions import corr
stock.select(corr('High','Volume')).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



In [151]:
 #max High per year?
from pyspark.sql.functions import year
yearstock = stock.withColumn("Year",year(stock['Date']))
max_stock = yearstock.groupBy('Year').max()
max_stock.select('Year','max(High)').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



In [152]:
# Average close for each Calender Month
from pyspark.sql.functions import month
monthstock = stock.withColumn('Month', month('Date'))
monthavgs = monthstock.select(['Month','Close']).groupBy('Month').mean()
monthavgs.select('Month','avg(close)').orderBy('Month').show()

+-----+-----------------+
|Month|       avg(close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+



Linear Regression

In [153]:
import spark

In [154]:
from pyspark.sql import  SparkSession

In [155]:
spark = SparkSession.builder.appName('lrex').getOrCreate()

In [156]:
from pyspark.ml.regression import LinearRegression

In [157]:
training = spark.read.format('libsvm').load("C://Users//VPraveenK//Downloads//sample_linear_regression_data.txt")

In [158]:
training.show()

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
| -9.490009878824548|(10,[0,1,2,3,4,5,...|
| 0.2577820163584905|(10,[0,1,2,3,4,5,...|
| -4.438869807456516|(10,[0,1,2,3,4,5,...|
|-19.782762789614537|(10,[0,1,2,3,4,5,...|
| -7.966593841555266|(10,[0,1,2,3,4,5,...|
| -7.896274316726144|(10,[0,1,2,3,4,5,...|
| -8.464803554195287|(10,[0,1,2,3,4,5,...|
| 2.1214592666251364|(10,[0,1,2,3,4,5,...|
| 1.0720117616524107|(10,[0,1,2,3,4,5,...|
|-13.772441561702871|(10,[0,1,2,3,4,5,...|
| -5.082010756207233|(10,[0,1,2,3,4,5,...|
|  7.887786536531237|(10,[0,1,2,3,4,5,...|
| 14.323146365332388|(10,[0,1,2,3,4,5,...|
|-20.057482615789212|(10,[0,1,2,3,4,5,...|
|-0.8995693247765151|(10,[0,1,2,3,4,5,...|
| -19.16829262296376|(10,[0,1,2,3,4,5,...|
|  5.601801561245534|(10,[0,1,2,3,4,5,...|
|-3.2256352187273354|(10,[0,1,2,3,4,5,...|
| 1.5299675726687754|(10,[0,1,2,3,4,5,...|
| -0.250102447941961|(10,[0,1,2,3,4,5,...|
+----------

In [159]:
lr = LinearRegression(featuresCol='features',labelCol='label',predictionCol='prediction')

In [160]:
lrModel = lr.fit(training)

In [161]:
lrModel.coefficients

DenseVector([0.0073, 0.8314, -0.8095, 2.4412, 0.5192, 1.1535, -0.2989, -0.5129, -0.6197, 0.6956])

In [162]:
lrModel.intercept

0.14228558260358093

In [163]:
training_summary = lrModel.summary

In [164]:
training_summary.rootMeanSquaredError

10.16309157133015

In [165]:
all_data = spark.read.format('libsvm').load("C://Users//VPraveenK//Downloads//sample_linear_regression_data.txt")

In [166]:
train_data,test_data = all_data.randomSplit([0.7,0.3]) #70% and 30%

In [167]:
test_data.describe().show()

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|                148|
|   mean| 0.6242911036825065|
| stddev| 10.682176136695022|
|    min|-28.046018037776633|
|    max| 26.903524792043335|
+-------+-------------------+



In [168]:
train_data.describe().show()

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|                353|
|   mean|  0.102850471882945|
| stddev| 10.172797593324413|
|    min|-28.571478869743427|
|    max|  27.78383192005107|
+-------+-------------------+



In [169]:
correct_model = lr.fit(train_data)

In [170]:
test_results = correct_model.evaluate(test_data)

In [171]:
test_results.residuals.show()

+-------------------+
|          residuals|
+-------------------+
|-27.237489785232942|
|-22.629085701243817|
| -21.11233995644708|
|-20.704043354516127|
|-22.471419335305992|
|-20.812629100708588|
| -18.53737489386841|
|-17.208102603070888|
|-18.936924924719737|
|-17.667417285886174|
| -11.18331230925783|
|-12.869409757086999|
| -9.074972518835246|
| -13.41202323516412|
| -8.642635743440133|
|-11.640421671649046|
|-10.447806428672157|
|-14.513540169072312|
|-11.561906537443441|
|-14.531470117308753|
+-------------------+
only showing top 20 rows



In [172]:
unlabel_data = test_data.select('features')

In [173]:
unlabel_data.show()

+--------------------+
|            features|
+--------------------+
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
+--------------------+
only showing top 20 rows



In [174]:
predictions = correct_model.transform(unlabel_data)

In [175]:
predictions.show()

+--------------------+--------------------+
|            features|          prediction|
+--------------------+--------------------+
|(10,[0,1,2,3,4,5,...| -0.8085282525436912|
|(10,[0,1,2,3,4,5,...|  -4.107121481357907|
|(10,[0,1,2,3,4,5,...|  -2.398544133875893|
|(10,[0,1,2,3,4,5,...| -0.7283444096496801|
|(10,[0,1,2,3,4,5,...|    2.25934207634732|
|(10,[0,1,2,3,4,5,...|  0.9280683264351626|
|(10,[0,1,2,3,4,5,...| -1.3356161441999945|
|(10,[0,1,2,3,4,5,...|-0.11861812960506203|
|(10,[0,1,2,3,4,5,...|   3.213409311671166|
|(10,[0,1,2,3,4,5,...|   2.610934311343741|
|(10,[0,1,2,3,4,5,...| -2.6837755859009373|
|(10,[0,1,2,3,4,5,...| -0.1705183070176165|
|(10,[0,1,2,3,4,5,...|  -3.698254480415952|
|(10,[0,1,2,3,4,5,...|  0.8534474463079325|
|(10,[0,1,2,3,4,5,...|  -3.768310659400028|
|(10,[0,1,2,3,4,5,...|-0.45392960688621237|
|(10,[0,1,2,3,4,5,...| -1.4095439367572686|
|(10,[0,1,2,3,4,5,...|   2.897764904056686|
|(10,[0,1,2,3,4,5,...|  0.9617761955344086|
|(10,[0,1,2,3,4,5,...|   4.17555

In [176]:
import spark

In [177]:
from pyspark.sql import SparkSession

In [178]:
spark = SparkSession.builder.appName('lr_example').getOrCreate()

In [179]:
from pyspark.ml.regression import LinearRegression 

In [180]:
data = spark.read.csv("C://Users//VPraveenK//Downloads//Ecommerce_Customers.csv", inferSchema= True, header= True)

In [181]:
data.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [182]:
data.head(1)

[Row(Email='mstephenson@fernandez.com', Address='835 Frank TunnelWrightmouth, MI 82180-9605', Avatar='Violet', Avg Session Length=34.49726772511229, Time on App=12.65565114916675, Time on Website=39.57766801952616, Length of Membership=4.0826206329529615, Yearly Amount Spent=587.9510539684005)]

In [183]:
for item in data.head(1)[0]:
    print(item)

mstephenson@fernandez.com
835 Frank TunnelWrightmouth, MI 82180-9605
Violet
34.49726772511229
12.65565114916675
39.57766801952616
4.0826206329529615
587.9510539684005


In [184]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [185]:
data.columns

['Email',
 'Address',
 'Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

In [186]:
assembler = VectorAssembler(inputCols=['Avg Session Length','Time on App','Time on Website','Length of Membership'], outputCol='features')

In [187]:
output = assembler.transform(data)

In [188]:
output.select('features').show()

+--------------------+
|            features|
+--------------------+
|[34.4972677251122...|
|[31.9262720263601...|
|[33.0009147556426...|
|[34.3055566297555...|
|[33.3306725236463...|
|[33.8710378793419...|
|[32.0215955013870...|
|[32.7391429383803...|
|[33.9877728956856...|
|[31.9365486184489...|
|[33.9925727749537...|
|[33.8793608248049...|
|[29.5324289670579...|
|[33.1903340437226...|
|[32.3879758531538...|
|[30.7377203726281...|
|[32.1253868972878...|
|[32.3388993230671...|
|[32.1878120459321...|
|[32.6178560628234...|
+--------------------+
only showing top 20 rows



In [189]:
output.head(1)

[Row(Email='mstephenson@fernandez.com', Address='835 Frank TunnelWrightmouth, MI 82180-9605', Avatar='Violet', Avg Session Length=34.49726772511229, Time on App=12.65565114916675, Time on Website=39.57766801952616, Length of Membership=4.0826206329529615, Yearly Amount Spent=587.9510539684005, features=DenseVector([34.4973, 12.6557, 39.5777, 4.0826]))]

In [190]:
final_data = output.select('features', 'Yearly Amount Spent')

In [191]:
final_data.show()

+--------------------+-------------------+
|            features|Yearly Amount Spent|
+--------------------+-------------------+
|[34.4972677251122...|  587.9510539684005|
|[31.9262720263601...|  392.2049334443264|
|[33.0009147556426...| 487.54750486747207|
|[34.3055566297555...|  581.8523440352177|
|[33.3306725236463...|  599.4060920457634|
|[33.8710378793419...|   637.102447915074|
|[32.0215955013870...|  521.5721747578274|
|[32.7391429383803...|  549.9041461052942|
|[33.9877728956856...|  570.2004089636196|
|[31.9365486184489...|  427.1993848953282|
|[33.9925727749537...|  492.6060127179966|
|[33.8793608248049...|  522.3374046069357|
|[29.5324289670579...|  408.6403510726275|
|[33.1903340437226...|  573.4158673313865|
|[32.3879758531538...|  470.4527333009554|
|[30.7377203726281...|  461.7807421962299|
|[32.1253868972878...| 457.84769594494855|
|[32.3388993230671...| 407.70454754954415|
|[32.1878120459321...|  452.3156754800354|
|[32.6178560628234...|   605.061038804892|
+----------

In [192]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [193]:
train_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                343|
|   mean| 498.83546646884815|
| stddev|  77.75551918443405|
|    min| 256.67058229005585|
|    max|  765.5184619388373|
+-------+-------------------+



In [194]:
test_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                157|
|   mean| 500.35958044891976|
| stddev|  82.86424513124743|
|    min|   266.086340948469|
|    max|  744.2218671047146|
+-------+-------------------+



In [195]:
lr = LinearRegression(labelCol='Yearly Amount Spent')

In [196]:
lr_model =lr.fit(train_data)

In [197]:
test_results = lr_model.evaluate(test_data)

In [198]:
test_results.residuals.show()

+-------------------+
|          residuals|
+-------------------+
| -4.999008431439563|
|-16.822849264598176|
|  6.733576026478659|
|-6.0433862291111495|
|-21.444735974782247|
| -7.100815210702194|
|-0.4842503216920022|
|-3.8711396594464986|
| 22.199322409977412|
|  2.915510923813258|
| 0.2987066171835977|
| 3.0038746859383423|
| 6.7910827208303886|
|  -7.26656407274902|
|  -6.05145095353663|
| 1.8690989603808816|
|-2.6102612637698144|
| 0.6887425248808086|
|-2.5391714048014364|
|-14.026839590640975|
+-------------------+
only showing top 20 rows



In [199]:
test_results.rootMeanSquaredError

10.179280789330873

In [200]:
test_results.r2 # r square - Variance 

0.9848128988389966

In [201]:
final_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                500|
|   mean|  499.3140382585909|
| stddev|   79.3147815497068|
|    min| 256.67058229005585|
|    max|  765.5184619388373|
+-------+-------------------+



In [202]:
unlabeled_data = test_data.select('features')

In [203]:
unlabeled_data.show()

+--------------------+
|            features|
+--------------------+
|[30.4925366965402...|
|[30.8162006488763...|
|[30.9716756438877...|
|[31.0613251567161...|
|[31.1239743499119...|
|[31.1280900496166...|
|[31.2606468698795...|
|[31.2681042107507...|
|[31.2834474760581...|
|[31.3091926408918...|
|[31.3895854806643...|
|[31.4459724827577...|
|[31.6548096756927...|
|[31.7207699002873...|
|[31.7242025238451...|
|[31.7366356860502...|
|[31.8186165667690...|
|[31.8293464559211...|
|[31.8530748017465...|
|[31.9365486184489...|
+--------------------+
only showing top 20 rows



In [204]:
predictions = lr_model.transform(unlabeled_data)

In [205]:
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[30.4925366965402...| 287.4702541513541|
|[30.8162006488763...| 282.9091902130672|
|[30.9716756438877...|487.90503373041406|
|[31.0613251567161...|493.59884428701275|
|[31.1239743499119...|  508.391789814548|
|[31.1280900496166...| 564.3535019577569|
|[31.2606468698795...| 421.8108815786434|
|[31.2681042107507...| 427.3416728332704|
|[31.2834474760581...|   569.58176701569|
|[31.3091926408918...|429.80520691612037|
|[31.3895854806643...| 409.7709044427993|
|[31.4459724827577...| 481.8730902491902|
|[31.6548096756927...| 468.4723410067181|
|[31.7207699002873...|  546.041497550772|
|[31.7242025238451...| 509.4393382414971|
|[31.7366356860502...|495.06434729515104|
|[31.8186165667690...|449.02893463390546|
|[31.8293464559211...| 384.4635954630942|
|[31.8530748017465...| 461.8242948671534|
|[31.9365486184489...|441.22622448596917|
+--------------------+------------

In [206]:
from pyspark.sql  import SparkSession   

In [207]:
spark = SparkSession.builder.appName('cruise').getOrCreate()

In [208]:
fd = spark.read.csv('C://Users//VPraveenK//Downloads//cruise_ship_info.csv',inferSchema=True,header=True)

In [209]:
fd.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [210]:
for ship in fd.head(5):
    print(ship)
    print('\n')

Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55)


Row(Ship_name='Quest', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55)


Row(Ship_name='Celebration', Cruise_line='Carnival', Age=26, Tonnage=47.262, passengers=14.86, length=7.22, cabins=7.43, passenger_density=31.8, crew=6.7)


Row(Ship_name='Conquest', Cruise_line='Carnival', Age=11, Tonnage=110.0, passengers=29.74, length=9.53, cabins=14.88, passenger_density=36.99, crew=19.1)


Row(Ship_name='Destiny', Cruise_line='Carnival', Age=17, Tonnage=101.353, passengers=26.42, length=8.92, cabins=13.21, passenger_density=38.36, crew=10.0)




In [211]:
fd.groupBy('Cruise_line').count().show()

+-----------------+-----+
|      Cruise_line|count|
+-----------------+-----+
|            Costa|   11|
|              P&O|    6|
|           Cunard|    3|
|Regent_Seven_Seas|    5|
|              MSC|    8|
|         Carnival|   22|
|          Crystal|    2|
|           Orient|    1|
|         Princess|   17|
|        Silversea|    4|
|         Seabourn|    3|
| Holland_American|   14|
|         Windstar|    3|
|           Disney|    2|
|        Norwegian|   13|
|          Oceania|    3|
|          Azamara|    2|
|        Celebrity|   10|
|             Star|    6|
|  Royal_Caribbean|   23|
+-----------------+-----+



In [212]:
from pyspark.ml.feature import StringIndexer

In [213]:
indexer = StringIndexer(inputCol='Cruise_line',outputCol='cruise_cat')
indexed = indexer.fit(fd).transform(fd)
indexed.head(1)

[Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55, cruise_cat=16.0)]

In [214]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler  

In [215]:
indexed.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew',
 'cruise_cat']

In [216]:
assembler = VectorAssembler(inputCols=['Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'cruise_cat'], outputCol='features')

In [217]:
output = assembler.transform(indexed)

In [218]:
output.select('features','crew').show()

+--------------------+----+
|            features|crew|
+--------------------+----+
|[6.0,30.276999999...|3.55|
|[6.0,30.276999999...|3.55|
|[26.0,47.262,14.8...| 6.7|
|[11.0,110.0,29.74...|19.1|
|[17.0,101.353,26....|10.0|
|[22.0,70.367,20.5...| 9.2|
|[15.0,70.367,20.5...| 9.2|
|[23.0,70.367,20.5...| 9.2|
|[19.0,70.367,20.5...| 9.2|
|[6.0,110.23899999...|11.5|
|[10.0,110.0,29.74...|11.6|
|[28.0,46.052,14.5...| 6.6|
|[18.0,70.367,20.5...| 9.2|
|[17.0,70.367,20.5...| 9.2|
|[11.0,86.0,21.24,...| 9.3|
|[8.0,110.0,29.74,...|11.6|
|[9.0,88.5,21.24,9...|10.3|
|[15.0,70.367,20.5...| 9.2|
|[12.0,88.5,21.24,...| 9.3|
|[20.0,70.367,20.5...| 9.2|
+--------------------+----+
only showing top 20 rows



In [219]:
final_data = output.select(['features','crew'])

In [220]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [221]:
train_data.describe().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|               111|
|   mean|7.9590990990991095|
| stddev|3.5941137609226725|
|    min|              0.59|
|    max|              21.0|
+-------+------------------+



In [222]:
test_data.describe().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|                47|
|   mean| 7.404680851063829|
| stddev|3.2836630202735932|
|    min|              0.59|
|    max|              13.6|
+-------+------------------+



In [223]:
from pyspark.ml.regression  import LinearRegression 

In [224]:
ship_lr = LinearRegression(labelCol='crew')

In [225]:
trained_ship_model = ship_lr.fit(train_data)

In [226]:
ship_results = trained_ship_model.evaluate(test_data)

In [227]:
ship_results.rootMeanSquaredError

0.8924316268634299

In [228]:
ship_results.r2

0.9245302749528794

In [229]:
ship_results.meanAbsoluteError

0.6145323916084757

In [230]:
ship_results.meanSquaredError

0.7964342086261081

corr - Pearson Correlation (to know how two columns are related)

In [231]:
from pyspark.sql.functions import corr

In [232]:
fd.describe().show()

+-------+---------+-----------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+
|summary|Ship_name|Cruise_line|               Age|           Tonnage|       passengers|           length|            cabins|passenger_density|             crew|
+-------+---------+-----------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+
|  count|      158|        158|               158|               158|              158|              158|               158|              158|              158|
|   mean| Infinity|       null|15.689873417721518| 71.28467088607599|18.45740506329114|8.130632911392404| 8.830000000000005|39.90094936708861|7.794177215189873|
| stddev|     null|       null| 7.615691058751413|37.229540025907866|9.677094775143416|1.793473548054825|4.4714172221480615| 8.63921711391542|3.503486564627034|
|    min|Adventure|    Azamara|   

In [233]:
fd.select(corr('crew','passengers')).show()

+----------------------+
|corr(crew, passengers)|
+----------------------+
|    0.9152341306065384|
+----------------------+



LOGISTIC REGRESSION

In [234]:
from pyspark.sql import  SparkSession

In [235]:
spark = SparkSession.builder.appName('mylogreg').getOrCreate()

In [236]:
from pyspark.ml.classification import LogisticRegression

In [237]:
my_data = spark.read.format('libsvm').load('C://Users//VPraveenK//Downloads//sample_libsvm_data.txt')

In [238]:
my_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows



In [239]:
my_log_reg_model = LogisticRegression()

In [240]:
fitted_logreg = my_log_reg_model.fit(my_data)

In [241]:
log_summary = fitted_logreg.summary

In [242]:
log_summary.predictions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [243]:
log_summary.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[127,128,129...|[20.3777627514872...|[0.99999999858729...|       0.0|
|  1.0|(692,[158,159,160...|[-21.114014198868...|[6.76550380000472...|       1.0|
|  1.0|(692,[124,125,126...|[-23.743613234676...|[4.87842678716177...|       1.0|
|  1.0|(692,[152,153,154...|[-19.192574012720...|[4.62137287298144...|       1.0|
|  1.0|(692,[151,152,153...|[-20.125398874699...|[1.81823629113068...|       1.0|
|  0.0|(692,[129,130,131...|[20.4890549504196...|[0.99999999873608...|       0.0|
|  1.0|(692,[158,159,160...|[-21.082940212814...|[6.97903542823766...|       1.0|
|  1.0|(692,[99,100,101,...|[-19.622713503550...|[3.00582577446132...|       1.0|
|  0.0|(692,[154,155,156...|[21.1594863606582...|[0.99999999935352...|       0.0|
|  0.0|(692,[127

In [244]:
lr_train,lr_test = my_data.randomSplit([0.7,0.3])

In [245]:
final_model = LogisticRegression()

In [246]:
fit_final = final_model.fit(lr_train)

In [247]:
prediction_and_labels = fit_final.evaluate(lr_test)

In [248]:
prediction_and_labels.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[124,125,126...|[35.0303933395005...|[0.99999999999999...|       0.0|
|  0.0|(692,[126,127,128...|[31.1375195184434...|[0.99999999999997...|       0.0|
|  0.0|(692,[126,127,128...|[39.7873051466212...|           [1.0,0.0]|       0.0|
|  0.0|(692,[126,127,128...|[22.2926013564879...|[0.99999999979181...|       0.0|
|  0.0|(692,[126,127,128...|[27.2887474253395...|[0.99999999999859...|       0.0|
|  0.0|(692,[127,128,129...|[29.3214713938755...|[0.99999999999981...|       0.0|
|  0.0|(692,[129,130,131...|[17.0523057733940...|[0.99999996071039...|       0.0|
|  0.0|(692,[150,151,152...|[22.4221975634996...|[0.99999999981712...|       0.0|
|  0.0|(692,[152,153,154...|[12.9820186113752...|[0.99999769866441...|       0.0|
|  0.0|(692,[155

In [249]:
from pyspark.ml.evaluation import (BinaryClassificationEvaluator, MulticlassClassificationEvaluator)

In [250]:
my_eval = BinaryClassificationEvaluator()

In [251]:
my_final_roc = my_eval.evaluate(prediction_and_labels.predictions)

In [252]:
my_final_roc

1.0

In [253]:
eff = spark.read.csv('C://Users//VPraveenK//Downloads//titanic.csv', inferSchema=True, header=True)

In [254]:
eff.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [255]:
eff.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [256]:
my_cols = eff.select(['Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Embarked'
])

In [257]:
my_final_data = my_cols.na.drop()  #It will fill the missing data

In [258]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer)
# stringIndexer - Converts string to number
#One hot encoder = It transforms the actual numbers for the categories into a one hot encoding where you have an array indicating kind of Zeros and Ones of what the actual category was

In [259]:
gender_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')
gender_encoder = OneHotEncoder(inputCol='SexIndex',outputCol='SexVec')

In [260]:
embark_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkIndex',outputCol='EmbarkVec')

In [261]:
assembler = VectorAssembler(inputCols= ['Pclass','SexVec','EmbarkVec','Age','SibSp','Parch','Fare'],
                            outputCol='features')

In [262]:
from pyspark.ml.classification import  LogisticRegression   

In [263]:
from pyspark.ml import Pipeline

In [264]:
log_reg_titanic = LogisticRegression(featuresCol='features',labelCol='Survived')

In [265]:
pipeline = Pipeline(stages=[gender_indexer,embark_indexer,
                            gender_encoder,embark_encoder,
                            assembler,log_reg_titanic])

In [266]:
train_data, test_data = my_final_data.randomSplit([0.7,0.3])

In [267]:
fit_model = pipeline.fit(train_data)

In [268]:
results = fit_model.transform(test_data)

In [269]:
from pyspark.ml.evaluation import  BinaryClassificationEvaluator

In [270]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Survived')

In [271]:
results.select('Survived', 'prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [272]:
AUC = my_eval.evaluate(results)
# Area under the Curve

In [273]:
AUC 

0.7648129423660263

Consulting Project 

In [274]:
from pyspark.sql import SparkSession    

In [275]:
spark = SparkSession.builder.appName("logregconsult").getOrCreate()

In [276]:
data = spark.read.csv('C://Users//VPraveenK//Downloads//customer_churn.csv',inferSchema=True,header=True)

In [277]:
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [278]:
data.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                 900|                 900|                900|
|   mean|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|                null|                null|0.16666666666666666|
| stddev|         null|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.764835592035

In [279]:
data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [280]:
from pyspark.ml.feature  import VectorAssembler 

In [281]:
assembler = VectorAssembler(inputCols=['Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites'], outputCol='features')

In [282]:
output = assembler.transform(data)

In [283]:
final_data = output.select('features','churn')

In [284]:
train_churn,test_churn = final_data.randomSplit([0.7,0.3])

In [285]:
from pyspark.ml.classification import LogisticRegression

In [286]:
lr_churn = LogisticRegression(labelCol='churn')

In [287]:
fitted_churn_model = lr_churn.fit(train_churn)

In [288]:
training_sum = fitted_churn_model.summary

In [289]:
training_sum.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|              churn|         prediction|
+-------+-------------------+-------------------+
|  count|                638|                638|
|   mean|0.18495297805642633|0.15047021943573669|
| stddev|0.38856405251027176|  0.357812249457894|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [290]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [291]:
pred_and_labels = fitted_churn_model.evaluate(test_churn)

In [292]:
pred_and_labels.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[22.0,11254.38,1....|    0|[5.11953171422319...|[0.99405671182543...|       0.0|
|[25.0,9672.03,0.0...|    0|[5.08870890385733...|[0.99387181064852...|       0.0|
|[28.0,8670.98,0.0...|    0|[8.14488868269047...|[0.99970986924449...|       0.0|
|[28.0,9090.43,1.0...|    0|[1.76347375311074...|[0.85364418964836...|       0.0|
|[29.0,10203.18,1....|    0|[4.02722156929167...|[0.98248834065511...|       0.0|
|[30.0,7960.64,1.0...|    1|[3.37300382321777...|[0.96685010177333...|       0.0|
|[30.0,8403.78,1.0...|    0|[6.17607886668028...|[0.99792574924342...|       0.0|
|[30.0,8677.28,1.0...|    0|[4.31686011322016...|[0.98683394833331...|       0.0|
|[30.0,10744.14,1....|    1|[1.93487658554946...|[0.87378820236786...|       0.0|
|[31.0,5304.6,0.

In [293]:
churn_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='churn')

In [294]:
auc = churn_eval.evaluate(pred_and_labels.predictions)

In [295]:
auc

0.7282608695652175

In [296]:
# Predict on New Data

final_lr_model = lr_churn.fit(final_data)

In [297]:
new_customers = spark.read.csv('C://Users//VPraveenK//Downloads//new_customers.csv', inferSchema=True, header=True)

In [298]:
new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [299]:
test_new_customers = assembler.transform(new_customers)

In [300]:
test_new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- features: vector (nullable = true)



In [301]:
final_results = final_lr_model.transform(test_new_customers)

In [302]:
final_results.select('Company', 'prediction').show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+



TREE METHODS DOCUMENTATION 

In [303]:
from pyspark.sql import  SparkSession   
spark = SparkSession.builder.appName('mytree').getOrCreate()

In [304]:
from pyspark.ml import  pipeline

In [305]:
from pyspark.ml.classification import (RandomForestClassifier,GBTClassifier,DecisionTreeClassifier)

In [306]:
data = spark.read.format('libsvm').load('C://Users//VPraveenk//Downloads//sample_libsvm_data.txt')

In [307]:
data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows



In [308]:
train_data,test_data = data.randomSplit([0.7,0.3])

In [309]:
dtc = DecisionTreeClassifier()
rfc =RandomForestClassifier(numTrees=100) #more trees => more computational time
gbt = GBTClassifier()

In [310]:
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

In [311]:
dtc_preds = dtc_model.transform(test_data)
rfc_preds = rfc_model.transform(test_data)
gbt_preds = gbt_model.transform(test_data)

In [312]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [313]:
acc_eval = MulticlassClassificationEvaluator(metricName='accuracy')

In [314]:
print('DTC ACCURACY:')
acc_eval.evaluate(dtc_preds)

DTC ACCURACY:


1.0

In [315]:
print('RFC ACCURACY:')
acc_eval.evaluate(rfc_preds)

RFC ACCURACY:


1.0

In [316]:
print('GBT ACCURACY:')
acc_eval.evaluate(gbt_preds)

GBT ACCURACY:


1.0

Decision Tree and Random Forest

In [317]:
from pyspark.sql import  SparkSession

In [318]:
spark = SparkSession.builder.appName('tree').getOrCreate()

In [319]:
data = spark.read.csv('C://Users//VPraveenK//Downloads//College.csv',inferSchema=True,header=True)

In [320]:
data.printSchema()

root
 |-- School: string (nullable = true)
 |-- Private: string (nullable = true)
 |-- Apps: integer (nullable = true)
 |-- Accept: integer (nullable = true)
 |-- Enroll: integer (nullable = true)
 |-- Top10perc: integer (nullable = true)
 |-- Top25perc: integer (nullable = true)
 |-- F_Undergrad: integer (nullable = true)
 |-- P_Undergrad: integer (nullable = true)
 |-- Outstate: integer (nullable = true)
 |-- Room_Board: integer (nullable = true)
 |-- Books: integer (nullable = true)
 |-- Personal: integer (nullable = true)
 |-- PhD: integer (nullable = true)
 |-- Terminal: integer (nullable = true)
 |-- S_F_Ratio: double (nullable = true)
 |-- perc_alumni: integer (nullable = true)
 |-- Expend: integer (nullable = true)
 |-- Grad_Rate: integer (nullable = true)



In [321]:
data.head(1)

[Row(School='Abilene Christian University', Private='Yes', Apps=1660, Accept=1232, Enroll=721, Top10perc=23, Top25perc=52, F_Undergrad=2885, P_Undergrad=537, Outstate=7440, Room_Board=3300, Books=450, Personal=2200, PhD=70, Terminal=78, S_F_Ratio=18.1, perc_alumni=12, Expend=7041, Grad_Rate=60)]

In [322]:
from pyspark.ml.feature import VectorAssembler      

In [323]:
data.columns

['School',
 'Private',
 'Apps',
 'Accept',
 'Enroll',
 'Top10perc',
 'Top25perc',
 'F_Undergrad',
 'P_Undergrad',
 'Outstate',
 'Room_Board',
 'Books',
 'Personal',
 'PhD',
 'Terminal',
 'S_F_Ratio',
 'perc_alumni',
 'Expend',
 'Grad_Rate']

In [324]:
assembler = VectorAssembler(inputCols=['Apps',
 'Accept',
 'Enroll',
 'Top10perc',
 'Top25perc',
 'F_Undergrad',
 'P_Undergrad',
 'Outstate',
 'Room_Board',
 'Books',
 'Personal',
 'PhD',
 'Terminal',
 'S_F_Ratio',
 'perc_alumni',
 'Expend',
 'Grad_Rate'],outputCol='features')

In [325]:
output = assembler.transform(data)

In [326]:
from pyspark.ml.feature import StringIndexer

In [327]:
indexer = StringIndexer(inputCol='Private',outputCol='PrivateIndex')

In [328]:
output_fixed = indexer.fit(output).transform(output)

In [329]:
output_fixed.printSchema()

root
 |-- School: string (nullable = true)
 |-- Private: string (nullable = true)
 |-- Apps: integer (nullable = true)
 |-- Accept: integer (nullable = true)
 |-- Enroll: integer (nullable = true)
 |-- Top10perc: integer (nullable = true)
 |-- Top25perc: integer (nullable = true)
 |-- F_Undergrad: integer (nullable = true)
 |-- P_Undergrad: integer (nullable = true)
 |-- Outstate: integer (nullable = true)
 |-- Room_Board: integer (nullable = true)
 |-- Books: integer (nullable = true)
 |-- Personal: integer (nullable = true)
 |-- PhD: integer (nullable = true)
 |-- Terminal: integer (nullable = true)
 |-- S_F_Ratio: double (nullable = true)
 |-- perc_alumni: integer (nullable = true)
 |-- Expend: integer (nullable = true)
 |-- Grad_Rate: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- PrivateIndex: double (nullable = false)



In [330]:
final_data = output_fixed.select('features','PrivateIndex')

In [331]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [332]:
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier

In [333]:
from pyspark.ml.regression import RandomForestRegressor

In [334]:
dtc = DecisionTreeClassifier(labelCol='PrivateIndex',featuresCol='features')
rfc = RandomForestClassifier(labelCol='PrivateIndex',featuresCol='features')
gbt = GBTClassifier(labelCol='PrivateIndex',featuresCol='features')

In [335]:
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

In [336]:
dtc_preds = dtc_model.transform(test_data)
rfc_preds = rfc_model.transform(test_data)
gbt_preds = gbt_model.transform(test_data)

In [337]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [338]:
my_binary_eval = BinaryClassificationEvaluator(labelCol='PrivateIndex')

In [339]:
print('DTC')
print(my_binary_eval.evaluate(dtc_preds))

DTC
0.9070291522823168


In [340]:
print('RFC')
print(my_binary_eval.evaluate(rfc_preds))

RFC
0.9627445339470658


In [341]:
dtc_preds.printSchema()

root
 |-- features: vector (nullable = true)
 |-- PrivateIndex: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [342]:
rfc_preds.printSchema()

root
 |-- features: vector (nullable = true)
 |-- PrivateIndex: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [343]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [344]:
acc_eval = MulticlassClassificationEvaluator(metricName='accuracy', labelCol='PrivateIndex')

In [345]:
rfc_acc = acc_eval.evaluate(rfc_preds)

In [346]:
rfc_acc

0.9330357142857143

RANDOM FOREST PROJECT

In [347]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('tree_consult').getOrCreate()

In [348]:
data = spark.read.csv('C://Users//VPraveenK//Downloads//dog_food.csv',inferSchema=True,header=True )

In [349]:
data.head(1)

[Row(A=4, B=2, C=12.0, D=3, Spoiled=1.0)]

In [350]:
from pyspark.ml.feature import VectorAssembler

In [351]:
data.columns

['A', 'B', 'C', 'D', 'Spoiled']

In [352]:
assembler = VectorAssembler(inputCols=['A','B','C','D'],outputCol='features')

In [353]:
output = assembler.transform(data)

In [354]:
from pyspark.ml.classification import RandomForestClassifier

In [355]:
rfc = RandomForestClassifier(labelCol='Spoiled',featuresCol='features')

In [356]:
output.printSchema()

root
 |-- A: integer (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: double (nullable = true)
 |-- D: integer (nullable = true)
 |-- Spoiled: double (nullable = true)
 |-- features: vector (nullable = true)



In [357]:
final_data = output.select(['features','Spoiled'])

In [358]:
final_data.show()

+-------------------+-------+
|           features|Spoiled|
+-------------------+-------+
| [4.0,2.0,12.0,3.0]|    1.0|
| [5.0,6.0,12.0,7.0]|    1.0|
| [6.0,2.0,13.0,6.0]|    1.0|
| [4.0,2.0,12.0,1.0]|    1.0|
| [4.0,2.0,12.0,3.0]|    1.0|
|[10.0,3.0,13.0,9.0]|    1.0|
| [8.0,5.0,14.0,5.0]|    1.0|
| [5.0,8.0,12.0,8.0]|    1.0|
| [6.0,5.0,12.0,9.0]|    1.0|
| [3.0,3.0,12.0,1.0]|    1.0|
| [9.0,8.0,11.0,3.0]|    1.0|
|[1.0,10.0,12.0,3.0]|    1.0|
|[1.0,5.0,13.0,10.0]|    1.0|
|[2.0,10.0,12.0,6.0]|    1.0|
|[1.0,10.0,11.0,4.0]|    1.0|
| [5.0,3.0,12.0,2.0]|    1.0|
| [4.0,9.0,11.0,8.0]|    1.0|
| [5.0,1.0,11.0,1.0]|    1.0|
|[4.0,9.0,12.0,10.0]|    1.0|
| [5.0,8.0,10.0,9.0]|    1.0|
+-------------------+-------+
only showing top 20 rows



In [359]:
rfc_model = rfc.fit(final_data)

In [360]:
final_data.head(1)

[Row(features=DenseVector([4.0, 2.0, 12.0, 3.0]), Spoiled=1.0)]

In [361]:
rfc_model.featureImportances
# 2 as more value => it is the one that causes spoilage quickly

SparseVector(4, {0: 0.0246, 1: 0.0287, 2: 0.9204, 3: 0.0264})

CLUSTERING

In [362]:
from pyspark.sql import SparkSession

In [363]:
spark = SparkSession.builder.appName('cluster').getOrCreate()

In [425]:
from pyspark.ml.clustering import KMeans, BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [426]:
dataset = spark.read.format('libsvm').load("C://Users//VPraveenK//Downloads//sample_kmeans_data.txt")

In [427]:
dataset.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+



In [428]:
means = BisectingKMeans().setK(2).setSeed(1)
#means = KMeans().setK(2).setSeed(1)
# In KMeans there is no computeCost

In [429]:
model = means.fit(final_data)

In [430]:
#wse = ClusteringEvaluator()
#wsse = wse.evaluate(model)
#print(str(wsse))


In [431]:
wse = model.computeCost(final_data)
print(wse)

#c:\Users\VPraveenK\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\ml\clustering.py:974: FutureWarning: Deprecated in 3.0.0. 
# It will be removed in future versions. Use ClusteringEvaluator instead. You can also get the cost on the training dataset in the summary.
  #warnings.warn(

10970.62643097642


In [432]:
centers = model.clusterCenters()

In [433]:
centers

[array([3.22222222, 5.63333333, 8.99259259, 5.95925926]),
 array([8.37272727, 5.34545455, 9.29090909, 5.11363636])]

In [434]:
results = model.transform(final_data)

In [435]:
results.show()

+-------------------+-------+----------+
|           features|Spoiled|prediction|
+-------------------+-------+----------+
| [4.0,2.0,12.0,3.0]|    1.0|         0|
| [5.0,6.0,12.0,7.0]|    1.0|         0|
| [6.0,2.0,13.0,6.0]|    1.0|         1|
| [4.0,2.0,12.0,1.0]|    1.0|         0|
| [4.0,2.0,12.0,3.0]|    1.0|         0|
|[10.0,3.0,13.0,9.0]|    1.0|         1|
| [8.0,5.0,14.0,5.0]|    1.0|         1|
| [5.0,8.0,12.0,8.0]|    1.0|         0|
| [6.0,5.0,12.0,9.0]|    1.0|         0|
| [3.0,3.0,12.0,1.0]|    1.0|         0|
| [9.0,8.0,11.0,3.0]|    1.0|         1|
|[1.0,10.0,12.0,3.0]|    1.0|         0|
|[1.0,5.0,13.0,10.0]|    1.0|         0|
|[2.0,10.0,12.0,6.0]|    1.0|         0|
|[1.0,10.0,11.0,4.0]|    1.0|         0|
| [5.0,3.0,12.0,2.0]|    1.0|         1|
| [4.0,9.0,11.0,8.0]|    1.0|         0|
| [5.0,1.0,11.0,1.0]|    1.0|         1|
|[4.0,9.0,12.0,10.0]|    1.0|         0|
| [5.0,8.0,10.0,9.0]|    1.0|         0|
+-------------------+-------+----------+
only showing top

In [436]:
from pyspark.sql import SparkSession    

In [437]:
spark = SparkSession.builder.appName('cluster').getOrCreate()

In [438]:
dataset = spark.read.csv("C://Users//VPraveenK//Downloads//seeds_dataset.csv",inferSchema=True,header=True)

In [439]:
dataset.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)



In [440]:
dataset.head(1)

[Row(area=15.26, perimeter=14.84, compactness=0.871, length_of_kernel=5.763, width_of_kernel=3.312, asymmetry_coefficient=2.221, length_of_groove=5.22)]

In [441]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [442]:
from pyspark.ml.feature import VectorAssembler

In [443]:
dataset.columns

['area',
 'perimeter',
 'compactness',
 'length_of_kernel',
 'width_of_kernel',
 'asymmetry_coefficient',
 'length_of_groove']

In [444]:
assembler = VectorAssembler(inputCols=dataset.columns,outputCol='features')

In [445]:
final_data = assembler.transform(dataset)

In [446]:
final_data.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)
 |-- features: vector (nullable = true)



In [447]:
from pyspark.ml.feature import StandardScaler

In [448]:
scaler = StandardScaler(inputCol='features',outputCol='scaledFeatures')

In [449]:
scaler_model = scaler.fit(final_data)

In [450]:
final_data = scaler_model.transform(final_data)

In [451]:
final_data.head(1)

[Row(area=15.26, perimeter=14.84, compactness=0.871, length_of_kernel=5.763, width_of_kernel=3.312, asymmetry_coefficient=2.221, length_of_groove=5.22, features=DenseVector([15.26, 14.84, 0.871, 5.763, 3.312, 2.221, 5.22]), scaledFeatures=DenseVector([5.2445, 11.3633, 36.8608, 13.0072, 8.7685, 1.4772, 10.621]))]

In [452]:
kmeans = KMeans(featuresCol='scaledFeatures',k=3)

In [453]:
model = kmeans.fit(final_data)

In [454]:
print('WSSSE') #Within Set Sum of Squared Errors
print(model.summary.trainingCost)
#print(model.computeCost(final_data))
# computeCost is removed from version 3

WSSSE
428.76536612896285


predictions = model.transform(finalDf)



from pyspark.ml.evaluation import ClusteringEvaluator



evaluator = ClusteringEvaluator(predictionCol='prediction',featuresCol='scaledFeatures',metricName='silhouette')



squaredEuclideanDistance = evaluator.evaluate(predictions)

squaredEuclideanDistance

In [455]:
centers = model.clusterCenters()

In [456]:
print(centers)

[array([ 4.078007  , 10.15076404, 35.87686106, 11.81860981,  7.5430707 ,
        3.17727834, 10.39174095]), array([ 6.32636687, 12.38115343, 37.39222755, 13.9206997 ,  9.75485787,
        2.41428142, 12.28078861]), array([ 4.9360523 , 10.94499696, 37.33487983, 12.40173794,  8.61516278,
        1.7804233 , 10.36535821])]


In [457]:
model.transform(final_data).select('prediction').show()

+----------+
|prediction|
+----------+
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         1|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         0|
+----------+
only showing top 20 rows



KMeans Clustering Project

In [458]:
from pyspark.sql import SparkSession

In [459]:
spark = SparkSession.builder.appName('cluster').getOrCreate()

In [460]:
dataset = spark.read.csv("C://Users//VPraveenK//Downloads//hack_data.csv",header=True,inferSchema=True)

In [461]:
dataset.head()

Row(Session_Connection_Time=8.0, Bytes Transferred=391.09, Kali_Trace_Used=1, Servers_Corrupted=2.96, Pages_Corrupted=7.0, Location='Slovenia', WPM_Typing_Speed=72.37)

In [462]:
from pyspark.ml.clustering import KMeans

In [463]:
from pyspark.ml.feature import VectorAssembler

In [464]:
dataset.columns

['Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'Location',
 'WPM_Typing_Speed']

In [465]:
feat_cols = ['Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'WPM_Typing_Speed']

In [466]:
assembler = VectorAssembler(inputCols=feat_cols,outputCol='features')

In [467]:
final_data = assembler.transform(dataset)

In [468]:
final_data.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)
 |-- features: vector (nullable = true)



In [469]:
from pyspark.ml.feature import StandardScaler

In [470]:
scaler = StandardScaler(inputCol='features',outputCol='scaledFeatures')

In [471]:
scaler_model = scaler.fit(final_data)
cluster_final_data = scaler_model.transform(final_data)

In [472]:
cluster_final_data.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)



In [473]:
# project was to find out 2 hackers or 3 hackers => k value is 2 and 3
kmeans2 = KMeans(featuresCol='scaledFeatures',k=2)
kmeans3 = KMeans(featuresCol='scaledFeatures',k=3)

In [474]:
model_k2 = kmeans2.fit(cluster_final_data)
model_k3 = kmeans3.fit(cluster_final_data)

In [475]:
model_k3.transform(cluster_final_data).select('prediction').show()

+----------+
|prediction|
+----------+
|         1|
|         0|
|         1|
|         1|
|         0|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         0|
|         0|
|         0|
|         1|
|         1|
|         1|
|         0|
|         1|
|         0|
+----------+
only showing top 20 rows



In [476]:
model_k3.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|   83|
|         2|  167|
|         0|   84|
+----------+-----+



In [477]:
model_k2.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  167|
|         0|  167|
+----------+-----+



so its even number of hackers 2

RECOMMENDER SYSTEM PROJECT

In [478]:
from pyspark.sql import SparkSession

In [479]:
spark = SparkSession.builder.appName('rec').getOrCreate()

In [480]:
from pyspark.ml.recommendation import ALS

In [481]:
from pyspark.ml.evaluation import RegressionEvaluator

In [482]:
data = spark.read.csv("C://Users//VPraveenK//Downloads//movielens_ratings.csv",inferSchema=True,header=True)

In [483]:
data.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
|     12|   2.0|     0|
|     15|   1.0|     0|
|     17|   1.0|     0|
|     19|   1.0|     0|
|     21|   1.0|     0|
|     23|   1.0|     0|
|     26|   3.0|     0|
|     27|   1.0|     0|
|     28|   1.0|     0|
|     29|   1.0|     0|
|     30|   1.0|     0|
|     31|   1.0|     0|
|     34|   1.0|     0|
|     37|   1.0|     0|
|     41|   2.0|     0|
+-------+------+------+
only showing top 20 rows



In [484]:
data.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [485]:
training,test = data.randomSplit([0.8,0.2])

In [486]:
als = ALS(maxIter=5,regParam=0.01,userCol='userId',itemCol='movieId',ratingCol='rating')

In [487]:
model = als.fit(training)

In [488]:
predictions = model.transform(test)

In [489]:
predictions.show()

+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|      1|   1.0|     3|0.46518952|
|      1|   1.0|    14| 2.5077426|
|      6|   3.0|    26| 1.4230827|
|      6|   2.0|    22| 2.2627974|
|      6|   1.0|    13| 0.7833752|
|      6|   2.0|    19| 0.9479556|
|      6|   1.0|    18| 0.3395951|
|      3|   1.0|    26| 1.1042798|
|      3|   1.0|     7|  2.227077|
|      3|   1.0|    21| 1.5038506|
|      3|   1.0|     0|0.21811469|
|      5|   1.0|     9| 1.0695455|
|      4|   3.0|     2| 1.6675051|
|      2|   2.0|     1| 1.9115146|
|      2|   3.0|     6|-0.6036116|
|      2|   3.0|     9| 1.5756217|
|      2|   1.0|    25| -4.571329|
|      0|   1.0|    26|  2.033407|
|      0|   1.0|     3|  0.916678|
|      0|   1.0|    15| 1.5901301|
+-------+------+------+----------+
only showing top 20 rows



In [490]:
evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')

In [491]:
#root mean square error
rmse = evaluator.evaluate(predictions)

In [492]:
print('RMSE')
print(rmse)

RMSE
1.4792210078118262


In [493]:
single_user = test.filter(test['userId']==11).select(['movieId','userId'])

In [494]:
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|     20|    11|
|     25|    11|
|     39|    11|
|     41|    11|
|     67|    11|
|     71|    11|
|     72|    11|
|     80|    11|
|     89|    11|
|     94|    11|
+-------+------+



In [495]:
recommendations = model.transform(single_user)

In [496]:
recommendations.orderBy('prediction',ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     80|    11|  4.060323|
|     20|    11|  2.632135|
|     39|    11| 2.2787237|
|     71|    11| 2.1559207|
|     72|    11|  2.011573|
|     94|    11| 1.7086428|
|     89|    11| 1.0780467|
|     41|    11| 0.8462488|
|     67|    11|0.58480716|
|     25|    11|-1.4404882|
+-------+------+----------+



NATURAL LANGUAGE PROCESSING

NLP TOOLS

In [497]:
# This is VERY IMPORTANT for show() to work
import findspark
findspark.init()

In [498]:
from pyspark.sql import SparkSession    

In [499]:
spark = SparkSession.builder.appName('nlp').getOrCreate()

In [500]:
# Tokenization is taking a sentence and breaking it into individual terms
from pyspark.ml.feature import Tokenizer,RegexTokenizer

In [501]:
from pyspark.sql.functions import col,udf
from pyspark.sql.types import IntegerType

In [502]:
sen_df = spark.createDataFrame([(0,'Hi I heard about spark'),(1,'I wish java could use case classes'),(2,'Logistic,regression')],['id','sentence'])

In [503]:
sen_df.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|Hi I heard about ...|
|  1|I wish java could...|
|  2| Logistic,regression|
+---+--------------------+



In [504]:
tokenizer = Tokenizer(inputCol='sentence',outputCol='words')

In [505]:
regex_tokenizer = RegexTokenizer(inputCol='sentence',outputCol='words',pattern='\\W')

In [506]:
count_tokens = udf(lambda words:len(words),IntegerType())

In [507]:
tokenized = tokenizer.transform(sen_df)

In [508]:
tokenized.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|
|  1|I wish java could...|[i, wish, java, c...|
|  2| Logistic,regression|[logistic,regress...|
+---+--------------------+--------------------+



In [509]:
rg_tokenized = regex_tokenizer.transform(sen_df)

In [510]:
rg_tokenized.withColumn('tokens',count_tokens(col('words'))).show()

+---+--------------------+--------------------+------+
| id|            sentence|               words|tokens|
+---+--------------------+--------------------+------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|     5|
|  1|I wish java could...|[i, wish, java, c...|     7|
|  2| Logistic,regression|[logistic, regres...|     2|
+---+--------------------+--------------------+------+



In [511]:
from pyspark.ml.feature import StopWordsRemover

In [512]:
sentenceDataFrame = spark.createDataFrame([
    (0,['I','saw','the','green','horse']),
    (1,['Mary','had','a','little','lamb'])
],['id','tokens'])

In [513]:
sentenceDataFrame.show()

+---+--------------------+
| id|              tokens|
+---+--------------------+
|  0|[I, saw, the, gre...|
|  1|[Mary, had, a, li...|
+---+--------------------+



In [514]:
remover = StopWordsRemover(inputCol='tokens',outputCol='filtered')

In [515]:
remover.transform(sentenceDataFrame).show()

+---+--------------------+--------------------+
| id|              tokens|            filtered|
+---+--------------------+--------------------+
|  0|[I, saw, the, gre...| [saw, green, horse]|
|  1|[Mary, had, a, li...|[Mary, little, lamb]|
+---+--------------------+--------------------+



In [516]:
from pyspark.ml.feature import NGram

In [517]:
wordDataFrame = spark.createDataFrame([(0,['Hi ','I' ,'heard' ,'about' ,'spark']),(1,['I ','wish' ,'java' ,'could',' use' ,'case' ,'classes']),(2,['Logistic','regression'])],['id','words'])

In [518]:
ngram = NGram(n=2, inputCol='words',outputCol='grams')

In [519]:
ngram.transform(wordDataFrame).select('grams').show(truncate=False)

+---------------------------------------------------------------------+
|grams                                                                |
+---------------------------------------------------------------------+
|[Hi  I, I heard, heard about, about spark]                           |
|[I  wish, wish java, java could, could  use,  use case, case classes]|
|[Logistic regression]                                                |
+---------------------------------------------------------------------+



NLP tools

In [520]:
from pyspark.sql  import SparkSession

In [521]:
spark = SparkSession.builder.appName('nlp').getOrCreate()

In [522]:
from pyspark.ml.feature import HashingTF,IDF,Tokenizer

In [523]:
sentenceData = spark.createDataFrame([
                (0.0, "Hi I heard about spark"),
                (0.0, "I wish Java could use case classes"),
                (1.0, "Logistic regression models are neat")
], ["label","sentence"])

In [524]:
sentenceData.show()

+-----+--------------------+
|label|            sentence|
+-----+--------------------+
|  0.0|Hi I heard about ...|
|  0.0|I wish Java could...|
|  1.0|Logistic regressi...|
+-----+--------------------+



In [525]:
tokenizer = Tokenizer(inputCol='sentence',outputCol='words')

In [526]:
words_data = tokenizer.transform(sentenceData)

In [527]:
words_data.show()

+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|  0.0|Hi I heard about ...|[hi, i, heard, ab...|
|  0.0|I wish Java could...|[i, wish, java, c...|
|  1.0|Logistic regressi...|[logistic, regres...|
+-----+--------------------+--------------------+



In [528]:
words_data.show(truncate=False)

+-----+-----------------------------------+------------------------------------------+
|label|sentence                           |words                                     |
+-----+-----------------------------------+------------------------------------------+
|0.0  |Hi I heard about spark             |[hi, i, heard, about, spark]              |
|0.0  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|
|1.0  |Logistic regression models are neat|[logistic, regression, models, are, neat] |
+-----+-----------------------------------+------------------------------------------+



In [529]:
hashing_tf = HashingTF(inputCol='words',outputCol='rawFeatures')

In [530]:
featurized_data = hashing_tf.transform(words_data)

In [531]:
idf = IDF(inputCol='rawFeatures',outputCol='features')

In [532]:
idf_model = idf.fit(featurized_data)

In [533]:
rescaled_data = idf_model.transform(featurized_data)

In [534]:
rescaled_data.select('label','features').show(truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                                                                                      |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0.0  |(262144,[18700,19036,33808,66273,173558],[0.6931471805599453,0.28768207245178085,0.6931471805599453,0.6931471805599453,0.6931471805599453])                                                   |
|0.0  |(262144,[19036,20719,55551,58672,98717,109547,192310],[0.28768207245178085,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453])|
|1.0 

In [535]:
from pyspark.ml.feature import CountVectorizer

In [536]:
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

In [537]:
df.show()

+---+---------------+
| id|          words|
+---+---------------+
|  0|      [a, b, c]|
|  1|[a, b, b, c, a]|
+---+---------------+



In [538]:
cv = CountVectorizer(inputCol='words',outputCol='features',
                    vocabSize=3,minDF=2.0)

In [539]:
model = cv.fit(df)

In [540]:
result = model.transform(df)

In [541]:
result.show(truncate=False)

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+



NLP Project

In [542]:
from pyspark.sql import SparkSession

In [543]:
spark = SparkSession.builder.appName('nlp').getOrCreate()

In [544]:
data = spark.read.csv("C://Users//VPraveenK//Downloads//SMSSpamCollection",inferSchema=True,sep='\t')

In [545]:
data.show()

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
|spam|FreeMsg Hey there...|
| ham|Even my brother i...|
| ham|As per your reque...|
|spam|WINNER!! As a val...|
|spam|Had your mobile 1...|
| ham|I'm gonna be home...|
|spam|SIX chances to wi...|
|spam|URGENT! You have ...|
| ham|I've been searchi...|
| ham|I HAVE A DATE ON ...|
|spam|XXXMobileMovieClu...|
| ham|Oh k...i'm watchi...|
| ham|Eh u remember how...|
| ham|Fine if thats th...|
|spam|England v Macedon...|
+----+--------------------+
only showing top 20 rows



In [546]:
data = data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')

In [547]:
data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [548]:
from pyspark.sql.functions import length

In [549]:
data = data.withColumn('length',length(data['text']))

In [550]:
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [551]:
data.groupBy('class').mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [552]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover,CountVectorizer,IDF,StringIndexer

In [553]:
tokenizer = Tokenizer(inputCol='text',outputCol='token_text')
stop_remove = StopWordsRemover(inputCol='token_text',outputCol='stop_token')
count_vec = CountVectorizer(inputCol='stop_token',outputCol='c_vec')
idf = IDF(inputCol='c_vec',outputCol='tf_idf')
ham_spam_to_numeric = StringIndexer(inputCol='class',outputCol='label')

In [554]:
from pyspark.ml.feature import  VectorAssembler

In [555]:
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='feature')

In [556]:
from pyspark.ml.classification import NaiveBayes

In [557]:
nb = NaiveBayes()

In [558]:
from pyspark.ml import Pipeline

In [559]:
data_prep_pipe = Pipeline(stages=[ham_spam_to_numeric,tokenizer,stop_remove,count_vec,idf,clean_up])

In [560]:
cleaner = data_prep_pipe.fit(data)

In [561]:
clean_data = cleaner.transform(data)

In [562]:
clean_data.columns

['class',
 'text',
 'length',
 'label',
 'token_text',
 'stop_token',
 'c_vec',
 'tf_idf',
 'feature']

In [568]:
clean_data = clean_data.select('label','feature')

In [564]:
clean_data.show()

+-----+--------------------+
|label|             feature|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
|  1.0|(13424,[10,60,139...|
|  0.0|(13424,[10,53,103...|
|  0.0|(13424,[125,184,4...|
|  1.0|(13424,[1,47,118,...|
|  1.0|(13424,[0,1,13,27...|
|  0.0|(13424,[18,43,120...|
|  1.0|(13424,[8,17,37,8...|
|  1.0|(13424,[13,30,47,...|
|  0.0|(13424,[39,96,217...|
|  0.0|(13424,[552,1697,...|
|  1.0|(13424,[30,109,11...|
|  0.0|(13424,[82,214,47...|
|  0.0|(13424,[0,2,49,13...|
|  0.0|(13424,[0,74,105,...|
|  1.0|(13424,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



In [565]:
training,test = clean_data.randomSplit([0.7,0.3])

FOR ALL BELOW ERRORS MAYBE WE SHOULD USE JAVA V8.0

In [566]:
spam_detector = nb.fit(training)

IllegalArgumentException: features does not exist. Available: label, feature

In [569]:
data.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)



In [570]:
test_results = spam_detector.transform(test)

NameError: name 'spam_detector' is not defined

In [571]:
test_results.show()

AttributeError: 'LinearRegressionSummary' object has no attribute 'show'

In [572]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [573]:
acc_eval = MulticlassClassificationEvaluator()

In [574]:
acc = acc_eval.evaluate(test_results)

AttributeError: 'LinearRegressionSummary' object has no attribute '_jdf'

In [575]:
print('ACC of NB Model')
print(acc)

ACC of NB Model


NameError: name 'acc' is not defined

SPARK STREAMING

In [597]:
spark.stop()

In [598]:
import findspark
findspark.init()

In [599]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [600]:
from pyspark.streaming import StreamingContext

In [601]:
sc = SparkContext('local[2]','NetworkWordCount')

In [602]:
ssc = StreamingContext(sc,1)

In [603]:
lines = ssc.socketTextStream('localhost',9999)

In [604]:
words = lines.flatMap(lambda line: line.split(" "))

In [605]:
pairs = words.map(lambda word:(word,1))

In [606]:
word_counts = pairs.reduceByKey(lambda num1,num2: num1+num2)

In [607]:
word_counts.pprint()

In [608]:
ssc.start()

-------------------------------------------
Time: 2023-01-03 01:18:32
-------------------------------------------

-------------------------------------------
Time: 2023-01-03 01:18:33
-------------------------------------------



In [609]:
ssc.stop()

-------------------------------------------
Time: 2023-01-03 01:18:34
-------------------------------------------

