## Spark Dataframe Basic

In [15]:
# !java -version


openjdk version "1.8.0_191"
OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-0ubuntu0.16.04.1-b12)
OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)


In [None]:
import findspark
findspark.init('/home/amit/spark-2.1.0-bin-hadoop2.7')

In [None]:
from pyspark.sql import SparkSession

In [4]:
# May take a little while on a local computer
app = SparkSession.builder.appName("Basics").getOrCreate()

In [15]:
df = app.read.json('./Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json')

In [16]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [17]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [20]:
df.columns

['age', 'name']

In [22]:
df.describe()

# returns a dataframe with statistics

DataFrame[summary: string, age: string, name: string]

In [23]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In above case the schema was inferred correctly . But in case we need to specify the schema following can be done

In [24]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [26]:
# StructField(
data_schema = [StructField('age',IntegerType(), True),
              StructField('name',StringType(), True)]

In [28]:
final_struc = StructType(data_schema)

In [29]:
df = app.read.json('./Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json',schema=final_struc)

In [30]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [31]:
df.show()


+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



### Dataframe Basics - part two

In [33]:
df['age']

# return a column object

Column<b'age'>

In [39]:
df.select('age')
# returns a dataframe object - which can then be shown

DataFrame[age: int]

In [40]:
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [41]:
# selecting the ROWS
df.head(2)

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [46]:
print(type(df.head(2)[0]))
df.head(2)[0]

<class 'pyspark.sql.types.Row'>


Row(age=None, name='Michael')

#### Add or Rename a column to a Dataframe


In [51]:
df.withColumn('doubleage',df['age']*2).show()
# operation is NOT INPLACE

+----+-------+---------+
| age|   name|doubleage|
+----+-------+---------+
|null|Michael|     null|
|  30|   Andy|       60|
|  19| Justin|       38|
+----+-------+---------+



In [52]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [53]:
df.withColumnRenamed('age', 'newage').show()

+------+-------+
|newage|   name|
+------+-------+
|  null|Michael|
|    30|   Andy|
|    19| Justin|
+------+-------+



#### Execute sql query on Dataframe

In [57]:
df.createOrReplaceTempView('people_view')

In [59]:
results = app.sql('select * from people_view')

In [60]:
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [62]:
new_results = app.sql("SELECT * FROM people_view WHERE age >=30")
new_results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



## Spark Dataframe Basic Opeartions

In [65]:
df = app.read.csv('./Python-and-Spark-for-Big-Data-master/Spark_DataFrames/appl_stock.csv', inferSchema=True, header=True)

In [67]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [66]:
df.show()

+--------------------+------------------+------------------+------------------+------------------+---------+------------------+
|                Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+--------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:...|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:...|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:...|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:...|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:...|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902

#### Filter the Data

In [76]:
print(df.filter('close < 500').select('open').count())
df.filter('close < 500').select(['open','close']).show()

1359
+------------------+------------------+
|              open|             close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



In [80]:
df.filter(df['close'] <500).select(['open','close']).show()

+------------------+------------------+
|              open|             close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



In [84]:
df.filter((df['close'] <200) & ~(df['open'] >200)).show()

+--------------------+------------------+------------------+------------------+------------------+---------+------------------+
|                Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+--------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-02-01 00:00:...|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|
|2010-02-02 00:00:...|        195.909998|        196.319994|193.37999299999998|        195.859997|174585600|25.375532999999997|
|2010-02-03 00:00:...|        195.169994|        200.200003|        194.420004|        199.229994|153832000|25.812148999999998|
|2010-02-04 00:00:...|        196.730003|        198.370001|        191.570005|        192.050003|189413000|         24.881912|
|2010-02-05 00:00:...|192.63000300000002|             196.0|        190.850002|        195.460001|212576

In [85]:
df.filter(df['low']==197.16).show()

+--------------------+------------------+----------+------+------+---------+---------+
|                Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+--------------------+------------------+----------+------+------+---------+---------+
|2010-01-22 00:00:...|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+--------------------+------------------+----------+------+------+---------+---------+



In [86]:
result = df.filter(df['low']==197.16).collect()

In [87]:
result

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [89]:
row = result[0]


In [90]:
row.asDict()


{'Adj Close': 25.620401,
 'Close': 197.75,
 'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'High': 207.499996,
 'Low': 197.16,
 'Open': 206.78000600000001,
 'Volume': 220441900}