### Import

In [0]:
from pyspark.sql import SparkSession

In [0]:
 spark = SparkSession.builder.appName('Basics').getOrCreate()

In [0]:
df1 = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/veeraraghavan.cv@gmail.com/people.json")

In [0]:
# To show the dataframe
df1.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [0]:
# To show the dtypes
df1.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [0]:
df1.columns

Out[7]: ['age', 'name']

In [0]:
# Dataframe describe
df1.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



### Creating your own data schema while reading the data

In [0]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType 

In [0]:
data_schema = [StructField('age', IntegerType(), True),StructField('name', StringType(), True)]

In [0]:
final_struct = StructType(fields=data_schema)

In [0]:
df2 = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/veeraraghavan.cv@gmail.com/people.json", schema = final_struct)

In [0]:
df2.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



### Basics

In [0]:
df2['age']

Out[14]: Column<'age'>

In [0]:
# For displaying the column you have use the select method
df2.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [0]:
df2.head(2)[0]

Out[16]: Row(age=None, name='Michael')

In [0]:
type(df2.head(2)[0])

Out[17]: pyspark.sql.types.Row

In [0]:
df2.select(['age', 'name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [0]:
type(df2.select(['age', 'name']))

Out[19]: pyspark.sql.dataframe.DataFrame

In [0]:
# Creating a new column. Specify the name first and then the transformations
df2.withColumn('newage', df2['age']*2).show()

+----+-------+------+
| age|   name|newage|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    60|
|  19| Justin|    38|
+----+-------+------+



In [0]:
# These changes are not permanent 
df2.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [0]:
df2.withColumnRenamed('age', 'mynewage').show()

+--------+-------+
|mynewage|   name|
+--------+-------+
|    null|Michael|
|      30|   Andy|
|      19| Justin|
+--------+-------+



### Sql commands usage

In [0]:
df2.createOrReplaceTempView('people')

In [0]:
result = spark.sql('select * from people')

In [0]:
result.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [0]:
new_result = spark.sql("select * from people where name='Justin'")

In [0]:
new_result.show()

+---+------+
|age|  name|
+---+------+
| 19|Justin|
+---+------+



### Basic operations

In [0]:
from pyspark.sql import session

In [0]:
spark = SparkSession.builder.appName('ops').getOrCreate()

In [0]:
df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/veeraraghavan.cv@gmail.com/appl_stock.csv", inferschema=True, header=True)

In [0]:
df1.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [0]:
df1.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [0]:
df1.head(3)[0]

Out[33]: Row(Date='2010-01-04', Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [0]:
# Closing price less than 500
df1.filter('Close > 500').show()

+----------+------------------+------------------+------------------+------------------+---------+-----------------+
|      Date|              Open|              High|               Low|             Close|   Volume|        Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+-----------------+
|2012-02-13|        499.529991|503.83000899999996|497.08998899999995|502.60002099999997|129304000|        65.116633|
|2012-02-14|        504.659988|         509.56002|        502.000008|        509.459991|115099600|        66.005408|
|2012-02-16|        491.500008|        504.890007|         486.62999|502.20999900000004|236138000|        65.066102|
|2012-02-17|        503.109993|507.77002000000005|        500.299995|         502.12001|133951300|        65.054443|
|2012-02-21|506.88001299999996|        514.850021|504.12000300000005|        514.850021|151398800|        66.703738|
|2012-02-22|        513.079994|        515.489983|509.0700230000

In [0]:
df1.filter('Close > 500').select(['Open','Close']).show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        499.529991|502.60002099999997|
|        504.659988|        509.459991|
|        491.500008|502.20999900000004|
|        503.109993|         502.12001|
|506.88001299999996|        514.850021|
|        513.079994|        513.039993|
|        515.079987| 516.3899769999999|
| 519.6699980000001| 522.4099809999999|
|        521.309982|        525.760017|
|        527.960014|        535.410011|
| 541.5600049999999|        542.440025|
|        548.169983| 544.4699780000001|
|        544.240013|        545.180008|
|        545.420013| 533.1600269999999|
|        523.659996|        530.259987|
| 536.8000030000001| 530.6900099999999|
| 534.6899950000001|        541.989975|
|        544.209999|        545.170021|
| 548.9799879999999|        551.999977|
|        557.540024|        568.099998|
+------------------+------------------+
only showing top 20 rows



In [0]:
df1.filter(df1['Close']>500).show()

+----------+------------------+------------------+------------------+------------------+---------+-----------------+
|      Date|              Open|              High|               Low|             Close|   Volume|        Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+-----------------+
|2012-02-13|        499.529991|503.83000899999996|497.08998899999995|502.60002099999997|129304000|        65.116633|
|2012-02-14|        504.659988|         509.56002|        502.000008|        509.459991|115099600|        66.005408|
|2012-02-16|        491.500008|        504.890007|         486.62999|502.20999900000004|236138000|        65.066102|
|2012-02-17|        503.109993|507.77002000000005|        500.299995|         502.12001|133951300|        65.054443|
|2012-02-21|506.88001299999996|        514.850021|504.12000300000005|        514.850021|151398800|        66.703738|
|2012-02-22|        513.079994|        515.489983|509.0700230000

In [0]:
df1.filter(df1['Close']>500).select('Volume').show()

+---------+
|   Volume|
+---------+
|129304000|
|115099600|
|236138000|
|133951300|
|151398800|
|120825600|
|142006900|
|103768000|
|136895500|
|150096800|
|238002800|
|170817500|
|107928100|
|202281100|
|202559700|
|199630200|
|129114300|
|104729800|
|101820600|
|172713800|
+---------+
only showing top 20 rows



In [0]:
# Multiple conditions
df1.filter((df1['Close'] > 500) & (df1['Open'] > 700)).show()

+----------+----------+----------+-----------------+----------+---------+-----------------+
|      Date|      Open|      High|              Low|     Close|   Volume|        Adj Close|
+----------+----------+----------+-----------------+----------+---------+-----------------+
|2012-09-19|700.259979|703.989998|       699.569977|702.100021| 81718700|91.35431700000001|
|2012-09-21|702.409988|705.070023|699.3599849999999|700.089989|142897300|         91.09278|
+----------+----------+----------+-----------------+----------+---------+-----------------+



In [0]:
# To collect and store it in another variable
result = df1.filter(df1['Low'] == 197.16).collect()
result

Out[39]: [Row(Date='2010-01-22', Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [0]:
 result[0].asDict()

Out[40]: {'Date': '2010-01-22',
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [0]:
# How to get to a value
result[0].asDict()['Volume']

Out[41]: 220441900

### Groupby and Aggregate operations

In [0]:
from pyspark.sql import session

In [0]:
spark = SparkSession.builder.appName('agg').getOrCreate()

In [0]:
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/veeraraghavan.cv@gmail.com/sales_info.csv", inferSchema=True, header=True)

In [0]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [0]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [0]:
df.groupby('Company').count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [0]:
grouped_data = df.groupBy('Company')
grouped_data.agg({'Sales':'sum'}).show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [0]:
# Sql functions
from pyspark.sql.functions import countDistinct, avg, stddev

In [0]:
df.select(countDistinct('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [0]:
df.select(avg('Sales').alias('AverageSales')).show()

+-----------------+
|     AverageSales|
+-----------------+
|360.5833333333333|
+-----------------+



In [0]:
df.select(stddev('Sales')).show()

+------------------+
|stddev_samp(Sales)|
+------------------+
|250.08742410799007|
+------------------+



In [0]:
# How to round off the number above
from  pyspark.sql.functions import format_number

In [0]:
sales_std = df.select(stddev('Sales').alias('std'))
sales_std.select(format_number('std', 2).alias('final_std')).show()

+---------+
|final_std|
+---------+
|   250.09|
+---------+



In [0]:
# Sorting
# Ascending
df.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [0]:
# Descending order. You have call the whole column to do descending orderby
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+

