### Connect the Spark

In [1]:
import findspark
findspark.init()

### Create an Instance of Spark

In [2]:
from pyspark.sql import SparkSession

In [4]:
spark=SparkSession.builder.appName('basics').getOrCreate()

Exception: Java gateway process exited before sending its port number

### File Reading

In [5]:
df=spark.read.json('E:\Spark\people.json')

NameError: name 'spark' is not defined

In [8]:
df

DataFrame[age: bigint, name: string]

In [9]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [6]:
df.printSchema()

NameError: name 'df' is not defined

In [11]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [12]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



### Defining Schema While Reading Data

In [13]:
from pyspark.sql.types import StructField,StringType,StructType,IntegerType

In [14]:
data_schema=[StructField('age',IntegerType(),True),StructField('name',StringType(),True)]

In [15]:
final_struct=StructType(fields=data_schema)  

In [16]:
df=spark.read.json('E:\Spark\people.json',schema=final_struct)

In [17]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



df.printSchema()

## CSV File Reading

In [25]:
dfv=spark.read.csv('E:\Spark\Data\sales_info.csv',header=True,inferSchema=True)

In [26]:
dfv.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [27]:
dfv.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [28]:
dfv.head(10)[2]

Row(Company='GOOG', Person='Frank', Sales=340.0)

## Filter Operation

In [40]:
dtr=dfv.filter((dfv['Sales']>200) & (dfv['Company']=='APPL')).collect()


In [36]:
dtr=dfv.filter((dfv['Sales']==130) & (dfv['Company']=='APPL'))

In [41]:
dtr

[Row(Company='APPL', Person='John', Sales=250.0),
 Row(Company='APPL', Person='Mike', Sales=750.0),
 Row(Company='APPL', Person=' Chris', Sales=350.0)]

## Group By

In [51]:
## mean, max, min, count, sum()
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [43]:
dfv.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [46]:
dfv.groupBy('Company').sum().show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [47]:
dfv.groupBy('Company').count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [48]:
dfv.groupBy('Company').min().show()

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+



In [49]:
dfv.groupBy('Company').max().show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [50]:
dfv.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



## Aggregations

In [52]:
dfv.agg({'Sales':'min'}).show()

+----------+
|min(Sales)|
+----------+
|     120.0|
+----------+



In [53]:
dfv.agg({'Sales':'max'}).show()

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



In [54]:
dfv.agg({'Sales':'count'}).show()

+------------+
|count(Sales)|
+------------+
|          12|
+------------+



In [55]:
dfv.agg({'Sales':'mean'}).show()

+-----------------+
|       avg(Sales)|
+-----------------+
|360.5833333333333|
+-----------------+



### Aggregate Functions (SQL)

In [56]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [57]:
dfv.select(countDistinct('Company')).show()

+-----------------------+
|count(DISTINCT Company)|
+-----------------------+
|                      4|
+-----------------------+



In [62]:
dfP=dfv.select(countDistinct('Person').alias('PersonCount'))

In [63]:
dfP.show()

+-----------+
|PersonCount|
+-----------+
|         12|
+-----------+



In [59]:
dfv.select(avg('Sales')).show()

+-----------------+
|       avg(Sales)|
+-----------------+
|360.5833333333333|
+-----------------+



In [60]:
dfv.select(stddev('Sales')).show()

+------------------+
|stddev_samp(Sales)|
+------------------+
|250.08742410799007|
+------------------+



In [64]:
from pyspark.sql.functions import format_number


In [66]:
dfv.select(format_number(stddev('Sales'),2).alias('std_sales')).show()

+---------+
|std_sales|
+---------+
|   250.09|
+---------+

