In [2]:
from pyspark.sql import SparkSession

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [4]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [5]:
data_schema = [StructField('Region', StringType(), True),
				StructField('Exchange', StringType(), True),
				StructField('Index', StringType(), True),
				StructField('Currency', StringType(), True)]

final_struct = StructType(data_schema)

In [6]:
df = spark.read.option("header", True).csv('../data/stock-data/indexInfo.csv', schema=final_struct)

In [7]:
df.show()

+-------------+--------------------+---------+--------+
|       Region|            Exchange|    Index|Currency|
+-------------+--------------------+---------+--------+
|United States|New York Stock Ex...|      NYA|     USD|
|United States|              NASDAQ|     IXIC|     USD|
|    Hong Kong|Hong Kong Stock E...|      HSI|     HKD|
|        China|Shanghai Stock Ex...|000001.SS|     CNY|
|        Japan|Tokyo Stock Exchange|     N225|     JPY|
|       Europe|            Euronext|     N100|     EUR|
|        China|Shenzhen Stock Ex...|399001.SZ|     CNY|
|       Canada|Toronto Stock Exc...|   GSPTSE|     CAD|
|        India|National Stock Ex...|     NSEI|     INR|
|      Germany|Frankfurt Stock E...|    GDAXI|     EUR|
|        Korea|      Korea Exchange|     KS11|     KRW|
|  Switzerland|  SIX Swiss Exchange|     SSMI|     CHF|
|       Taiwan|Taiwan Stock Exch...|     TWII|     TWD|
| South Africa|Johannesburg Stoc...|  J203.JO|     ZAR|
+-------------+--------------------+---------+--

In [8]:
df.columns

['Region', 'Exchange', 'Index', 'Currency']

In [9]:
df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Exchange: string (nullable = true)
 |-- Index: string (nullable = true)
 |-- Currency: string (nullable = true)



In [10]:
df['Region']

Column<'Region'>

In [11]:
df.select('Region')

DataFrame[Region: string]

In [12]:
df.select('Region').show()

+-------------+
|       Region|
+-------------+
|United States|
|United States|
|    Hong Kong|
|        China|
|        Japan|
|       Europe|
|        China|
|       Canada|
|        India|
|      Germany|
|        Korea|
|  Switzerland|
|       Taiwan|
| South Africa|
+-------------+



In [15]:
df.head(2)[1]

Row(Region='United States', Exchange='NASDAQ', Index='IXIC', Currency='USD')

In [16]:
type(df.head(2)[1])

pyspark.sql.types.Row

In [17]:
df.select(['Region','Currency']).show()

+-------------+--------+
|       Region|Currency|
+-------------+--------+
|United States|     USD|
|United States|     USD|
|    Hong Kong|     HKD|
|        China|     CNY|
|        Japan|     JPY|
|       Europe|     EUR|
|        China|     CNY|
|       Canada|     CAD|
|        India|     INR|
|      Germany|     EUR|
|        Korea|     KRW|
|  Switzerland|     CHF|
|       Taiwan|     TWD|
| South Africa|     ZAR|
+-------------+--------+



In [21]:
df.withColumnRenamed('Region', 'Country').head(2)

[Row(Country='United States', Exchange='New York Stock Exchange', Index='NYA', Currency='USD'),
 Row(Country='United States', Exchange='NASDAQ', Index='IXIC', Currency='USD')]

In [19]:
df.columns

['Region', 'Exchange', 'Index', 'Currency']

In [22]:
df.createOrReplaceTempView('indices')

In [26]:
res = spark.sql("select * from indices")

In [27]:
res.head(2)

[Row(Region='United States', Exchange='New York Stock Exchange', Index='NYA', Currency='USD'),
 Row(Region='United States', Exchange='NASDAQ', Index='IXIC', Currency='USD')]

In [28]:
res_2 = spark.sql("select * from indices where region = 'China'")

In [29]:
res_2.show()

+------+--------------------+---------+--------+
|Region|            Exchange|    Index|Currency|
+------+--------------------+---------+--------+
| China|Shanghai Stock Ex...|000001.SS|     CNY|
| China|Shenzhen Stock Ex...|399001.SZ|     CNY|
+------+--------------------+---------+--------+



In [4]:
spark = SparkSession.builder.appName("Stocks").getOrCreate()

In [5]:
df = spark.read.csv("../data/stock-data/indexData.csv", inferSchema=True, header=True)

In [33]:
df.head(2)

[Row(Index='NYA', Date=datetime.date(1965, 12, 31), Open='528.690002', High='528.690002', Low='528.690002', Close='528.690002', Adj Close='528.690002', Volume='0'),
 Row(Index='NYA', Date=datetime.date(1966, 1, 3), Open='527.210022', High='527.210022', Low='527.210022', Close='527.210022', Adj Close='527.210022', Volume='0')]

In [34]:
df.printSchema()

root
 |-- Index: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)



In [38]:
df.filter("Volume > 1000").head(2)

[Row(Index='NYA', Date=datetime.date(2001, 1, 3), Open='6968.830078', High='6968.830078', Low='6968.830078', Close='6968.830078', Adj Close='6968.830078', Volume='1880700000'),
 Row(Index='NYA', Date=datetime.date(2001, 1, 5), Open='6774.060059', High='6774.060059', Low='6774.060059', Close='6774.060059', Adj Close='6774.060059', Volume='1430800000')]

In [39]:
df.filter("Volume > 1000").select(['Index', 'Date']).head(2)

[Row(Index='NYA', Date=datetime.date(2001, 1, 3)),
 Row(Index='NYA', Date=datetime.date(2001, 1, 5))]

In [44]:
df.filter( (df['open'] < 500) & ~(df['volume'] > 100000) ).head(2)

[Row(Index='NYA', Date=datetime.date(1966, 5, 9), Open='493.480011', High='493.480011', Low='493.480011', Close='493.480011', Adj Close='493.480011', Volume='0'),
 Row(Index='NYA', Date=datetime.date(1966, 5, 10), Open='497.709991', High='497.709991', Low='497.709991', Close='497.709991', Adj Close='497.709991', Volume='0')]

In [6]:
res = df.filter(df['Adj Close'] == 497.709991).collect()

In [8]:
res[1]['Volume']

'328528500'

In [11]:
df.show()

+-----+----------+----------+----------+----------+----------+----------+------+
|Index|      Date|      Open|      High|       Low|     Close| Adj Close|Volume|
+-----+----------+----------+----------+----------+----------+----------+------+
|  NYA|1965-12-31|528.690002|528.690002|528.690002|528.690002|528.690002|     0|
|  NYA|1966-01-03|527.210022|527.210022|527.210022|527.210022|527.210022|     0|
|  NYA|1966-01-04|527.840027|527.840027|527.840027|527.840027|527.840027|     0|
|  NYA|1966-01-05|531.119995|531.119995|531.119995|531.119995|531.119995|     0|
|  NYA|1966-01-06|532.070007|532.070007|532.070007|532.070007|532.070007|     0|
|  NYA|1966-01-07|532.599976|532.599976|532.599976|532.599976|532.599976|     0|
|  NYA|1966-01-10|533.869995|533.869995|533.869995|533.869995|533.869995|     0|
|  NYA|1966-01-11|534.289978|534.289978|534.289978|534.289978|534.289978|     0|
|  NYA|1966-01-12|533.340027|533.340027|533.340027|533.340027|533.340027|     0|
|  NYA|1966-01-13|534.400024

In [14]:
df.groupBy('index').count().show()

+---------+-----+
|    index|count|
+---------+-----+
|     NSEI| 3381|
|   GSPTSE|10776|
|      NYA|13948|
|      HSI| 8750|
|399001.SZ| 5928|
|     IXIC|12690|
|000001.SS| 5963|
|     KS11| 6181|
|     SSMI| 7830|
|    GDAXI| 8606|
|     TWII| 6010|
|     N225|14500|
|     N100| 5507|
|  J203.JO| 2387|
+---------+-----+



In [18]:
df.agg({'index': 'count'}).show()

+------------+
|count(index)|
+------------+
|      112457|
+------------+



In [19]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [23]:
df.agg(countDistinct('index').alias('No of indices')).show()

+-------------+
|No of indices|
+-------------+
|           14|
+-------------+



In [25]:
from pyspark.sql.functions import format_number

In [29]:
df.agg(format_number(stddev('open'),2).alias('std')).show()

+--------+
|     std|
+--------+
|9,011.48|
+--------+



In [30]:
df.orderBy('volume').show()

+-----+----------+-----------+-----------+-----------+-----------+-----------+------+
|Index|      Date|       Open|       High|        Low|      Close|  Adj Close|Volume|
+-----+----------+-----------+-----------+-----------+-----------+-----------+------+
| TWII|1997-07-24|9500.860352|9632.419922|9475.540039|9632.419922|9632.383789|     0|
|  NYA|1965-12-31| 528.690002| 528.690002| 528.690002| 528.690002| 528.690002|     0|
| TWII|1997-08-22|9992.280273|10056.16016|9979.730469|10020.54981|10020.51172|     0|
|  NYA|1966-01-03| 527.210022| 527.210022| 527.210022| 527.210022| 527.210022|     0|
| TWII|1997-07-25| 9719.05957|9819.769531|9710.269531|9808.910156|9808.873047|     0|
|  NYA|1966-01-04| 527.840027| 527.840027| 527.840027| 527.840027| 527.840027|     0|
| TWII|1997-07-23|9694.280273|9694.280273|9345.900391| 9381.05957|9381.025391|     0|
|  NYA|1966-01-05| 531.119995| 531.119995| 531.119995| 531.119995| 531.119995|     0|
| TWII|1997-07-28|9929.980469|9995.370117|9899.580078|

In [37]:
df.orderBy(df['date'].desc()).head(2)

[Row(Index='N225', Date=datetime.date(2021, 6, 3), Open='28890.39063', High='29157.16016', Low='28879.15039', Close='29058.10938', Adj Close='29058.10938', Volume='0'),
 Row(Index='399001.SZ', Date=datetime.date(2021, 6, 2), Open='15044.94043', High='15051.20996', Low='14793.74023', Close='14857.91016', Adj Close='14857.91016', Volume='1470800')]