In [2]:
import findspark

In [3]:
findspark.init('/Users/kiranrudresha/Documents/spark/spark-2.2.0-bin-hadoop2.7')

In [6]:
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder.appName('df basics').getOrCreate()

In [14]:
df = spark.read.json('/Users/kiranrudresha/Documents/spark/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.json')

In [15]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [16]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [17]:
df.columns

['age', 'name']

In [18]:
df.describe()

DataFrame[summary: string, age: string, name: string]

In [19]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [20]:
from pyspark.sql.types import (StructField, StringType, IntegerType, StructType)

In [31]:
data_schema = [StructField('age', IntegerType(),True),
               StructField('name',StringType(),True)]

In [32]:
final_struct = StructType(fields=data_schema)

In [33]:
df = spark.read.json('/Users/kiranrudresha/Documents/spark/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.json',
                      schema=final_struct)

In [34]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [35]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [36]:
df['age']

Column<b'age'>

In [37]:
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [38]:
type(df.select('age'))

pyspark.sql.dataframe.DataFrame

In [43]:
type(df.head(2))

list

In [53]:
type(df.head(2)[0])

pyspark.sql.types.Row

In [54]:
df.select(['age','name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [55]:
type(df.select(['age','name']))

pyspark.sql.dataframe.DataFrame

## Add new column

In [56]:
df.withColumn('double_age',df['age']*2).show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



## Column Rename

In [57]:
df.withColumnRenamed('age','new_age').show()

+-------+-------+
|new_age|   name|
+-------+-------+
|   null|Michael|
|     30|   Andy|
|     19| Justin|
+-------+-------+



## Temporary View

In [58]:
df.createOrReplaceTempView('people')

In [59]:
spark.sql('select * from people').show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [2]:
import findspark

In [3]:
findspark.init('/Users/kiranrudresha/Documents/spark/spark-2.2.0-bin-hadoop2.7')

In [7]:
spark = SparkSession.builder.appName('Apple stocks').getOrCreate()

In [4]:
from pyspark.sql import SparkSession

In [11]:
df1 = spark.read.csv('/Users/kiranrudresha/Documents/spark/pyspark/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/appl_stock.csv',inferSchema=True,header=True)

In [12]:
df1.head(5)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
 Row(Date=datetime.datetime(2010, 1, 5, 0, 0), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002),
 Row(Date=datetime.datetime(2010, 1, 6, 0, 0), Open=214.379993, High=215.23, Low=210.750004, Close=210.969995, Volume=138040000, Adj Close=27.333178000000004),
 Row(Date=datetime.datetime(2010, 1, 7, 0, 0), Open=211.75, High=212.000006, Low=209.050005, Close=210.58, Volume=119282800, Adj Close=27.28265),
 Row(Date=datetime.datetime(2010, 1, 8, 0, 0), Open=210.299994, High=212.000006, Low=209.06000500000002, Close=211.98000499999998, Volume=111902700, Adj Close=27.464034)]

In [13]:
df1.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [14]:
df1.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [18]:
df1.head(5)[0]

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [19]:
df1.filter("Open > 200").show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [27]:
df1.filter(df1['close'] > 200).select('close').show()
df1.filter(df1['close'] > 200).select(['close','Open']).show()

+------------------+
|             close|
+------------------+
|        214.009998|
|        214.379993|
|        210.969995|
|            210.58|
|211.98000499999998|
|210.11000299999998|
|        207.720001|
|        210.650002|
|            209.43|
|            205.93|
|        215.039995|
|            211.73|
|        208.069996|
|        203.070002|
|        205.940001|
|        207.880005|
|200.37999299999998|
|        203.399996|
|        202.550003|
|        202.929998|
+------------------+
only showing top 20 rows

+------------------+------------------+
|             close|              Open|
+------------------+------------------+
|        214.009998|        213.429998|
|        214.379993|        214.599998|
|        210.969995|        214.379993|
|            210.58|            211.75|
|211.98000499999998|        210.299994|
|210.11000299999998|212.79999700000002|
|        207.720001|209.18999499999998|
|        210.650002|        207.870005|
|            209.43|210.110002

In [31]:
df1.filter( (df1['close'] > 200) & ~(df1['Open'] > 200) ).show()

+-------------------+----------+------------------+----------+------------------+---------+------------------+
|               Date|      Open|              High|       Low|             Close|   Volume|         Adj Close|
+-------------------+----------+------------------+----------+------------------+---------+------------------+
|2010-02-12 00:00:00|198.109995|        201.639996|195.500002|200.37999299999998|163867200|25.961142000000002|
|2010-02-24 00:00:00|198.229998|201.44000400000002|197.840002|            200.66|115141600|         25.997419|
|2010-02-25 00:00:00|197.380005|        202.859997|196.889994|        202.000004|166281500|          26.17103|
+-------------------+----------+------------------+----------+------------------+---------+------------------+



In [33]:
df1.filter( df1['Open'] == 198.109995).show()

+-------------------+----------+----------+----------+------------------+---------+------------------+
|               Date|      Open|      High|       Low|             Close|   Volume|         Adj Close|
+-------------------+----------+----------+----------+------------------+---------+------------------+
|2010-02-12 00:00:00|198.109995|201.639996|195.500002|200.37999299999998|163867200|25.961142000000002|
+-------------------+----------+----------+----------+------------------+---------+------------------+



In [34]:
result1 = df1.filter( df1['Open'] == 198.109995).collect()

In [35]:
result1

[Row(Date=datetime.datetime(2010, 2, 12, 0, 0), Open=198.109995, High=201.639996, Low=195.500002, Close=200.37999299999998, Volume=163867200, Adj Close=25.961142000000002)]

In [36]:
result1[0]

Row(Date=datetime.datetime(2010, 2, 12, 0, 0), Open=198.109995, High=201.639996, Low=195.500002, Close=200.37999299999998, Volume=163867200, Adj Close=25.961142000000002)

In [39]:
row = result1[0]

In [41]:
row.asDict()['Volume']

163867200

In [8]:
df1 = spark.read.csv('/Users/kiranrudresha/Documents/spark/pyspark/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/sales_info.csv',inferSchema=True,header=True)

In [43]:
df1.count()

12

In [44]:
df1.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [52]:
df1.groupBy('Company').sum().show()
type(df1.groupBy('company'))

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



pyspark.sql.group.GroupedData

In [49]:
group_data = df1.groupBy('company')

In [54]:
group_data.agg({'Sales':'mean'}).show()

+-------+-----------------+
|company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [4]:
from pyspark.sql.functions import countDistinct, avg,stddev

In [9]:
df1.select(countDistinct('sales')).show()

+---------------------+
|count(DISTINCT sales)|
+---------------------+
|                   11|
+---------------------+



In [10]:
df1.select(countDistinct('sales').alias('Avg Sales')).show()

+---------+
|Avg Sales|
+---------+
|       11|
+---------+



In [11]:
from pyspark.sql.functions import format_number

In [14]:
df1.select(stddev('sales').alias('std')).show()

+------------------+
|               std|
+------------------+
|250.08742410799007|
+------------------+



In [15]:
sale_std = df1.select(stddev('sales').alias('std'))

In [16]:
sale_std.select(format_number('std',2)).show()

+---------------------+
|format_number(std, 2)|
+---------------------+
|               250.09|
+---------------------+



In [17]:
type(sale_std)

pyspark.sql.dataframe.DataFrame

In [18]:
sale_std.printSchema()

root
 |-- std: double (nullable = true)



In [20]:
df1.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



# order by descending

In [21]:
df1.orderBy(df1['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+

