In [1]:
from pyspark.sql import SparkSession
spark=SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()
spark

### Structuring Data Using Spark Schema

In [2]:
from pyspark.sql.types import *

data_schema = [
               StructField('_c0', IntegerType(), True),
               StructField('symbol', StringType(), True),
               StructField('date', DateType(), True),
               StructField('open', DoubleType(), True),
               StructField('high', DoubleType(), True),
               StructField('low', DoubleType(), True),
               StructField('close', DoubleType(), True),
               StructField('volume', IntegerType(), True),
               StructField('adjusted', DoubleType(), True),
               StructField('market.cap', StringType(), True),
               StructField('sector', StringType(), True),
               StructField('industry', StringType(), True),
               StructField('exchange', StringType(), True),
            ]

final_struc = StructType(fields = data_schema)

df = spark.read.csv(
    'stocks_price_sample1.csv',
    sep = ',',
    header = True,
    schema = final_struc 
    )

df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- symbol: string (nullable = true)
 |-- date: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- adjusted: double (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



In [3]:
df.distinct()

DataFrame[_c0: int, symbol: string, date: date, open: double, high: double, low: double, close: double, volume: int, adjusted: double, market.cap: string, sector: string, industry: string, exchange: string]

In [4]:
df.head()

Row(_c0=1, symbol='TXG', date=datetime.date(2019, 9, 12), open=54.0, high=58.0, low=51.0, close=52.75, volume=7326300, adjusted=52.75, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ')

In [5]:
df.show()

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|_c0|symbol|      date|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  5|   TXG|2019-09-18|56.849998|    62.2

In [6]:
df.count()

29

### Filtering

In [7]:
from pyspark.sql.functions import col, lit

df.filter( (col('date') >= '2019-09-11') & (col('date') <= '2019-10-01') ).show(5)

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|_c0|symbol|      date|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  5|   TXG|2019-09-18|56.849998|    62.2

In [8]:
df.filter(df.adjusted.between(50.0, 500.0)).show()

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|_c0|symbol|      date|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  5|   TXG|2019-09-18|56.849998|    62.2

In [9]:
from pyspark.sql.functions import *
df.select('adjusted',when(df.adjusted <= 50.0, 1).when((df.adjusted>=55) & 
                                                       (df.adjusted<=57),2).otherwise(0).alias('result')).show()

+---------+------+
| adjusted|result|
+---------+------+
|    52.75|     0|
|    52.27|     0|
|55.200001|     2|
|56.779999|     2|
|     62.0|     0|
|61.119999|     0|
|     60.5|     0|
|60.330002|     0|
|54.299999|     0|
|52.759998|     0|
|49.990002|     1|
|51.029999|     0|
|50.400002|     0|
|47.029999|     1|
|    46.07|     1|
|48.119999|     1|
|51.450001|     0|
|50.360001|     0|
|49.549999|     1|
|50.009998|     0|
+---------+------+
only showing top 20 rows



In [10]:
df.select('sector', 
            df.sector.rlike('^[B,C]').alias('Sector Starting with B or C')
            ).distinct().show()

+-------------+---------------------------+
|       sector|Sector Starting with B or C|
+-------------+---------------------------+
|  Health Care|                      false|
|Capital Goods|                       true|
+-------------+---------------------------+



In [11]:
df.select('sector', 
            df.sector.like('Ca%').alias('Sector Starting with B or C')
            ).show(30)

+-------------+---------------------------+
|       sector|Sector Starting with B or C|
+-------------+---------------------------+
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                       true|
|Capital Goods|                 

### Filtering with aggregate

In [12]:
df.select([
             'industry', 
             'open', 
             'close', 
             'adjusted'
            ]
            ).groupBy('industry').mean().show()

+--------------------+-----------------+-----------------+-----------------+
|            industry|        avg(open)|       avg(close)|    avg(adjusted)|
+--------------------+-----------------+-----------------+-----------------+
|Biotechnology: Bi...|35.87111111111111|35.95777866666666|35.95777866666666|
|Biotechnology: La...|      53.56049965|       53.1009999|       53.1009999|
+--------------------+-----------------+-----------------+-----------------+



In [13]:
df.select(['sector','open','close']).groupBy('sector').agg(min('open'),max('close')).show()

+-------------+---------+----------+
|       sector|min(open)|max(close)|
+-------------+---------+----------+
|  Health Care|34.349998| 38.240002|
|Capital Goods|    46.77|      62.0|
+-------------+---------+----------+



In [14]:
df.select(max("open")).show(truncate=False)

+---------+
|max(open)|
+---------+
|62.810001|
+---------+



In [17]:
df.select(mean("open").alias("Rata Open"),max("close").alias("Max Close"),min("adjusted").alias("Min Adjusted")).show()

+----------------+---------+------------+
|       Rata Open|Max Close|Min Adjusted|
+----------------+---------+------------+
|48.0706894137931|     62.0|   34.189999|
+----------------+---------+------------+



In [19]:
result = df.select(mean("open").alias("Rata Open"),max("close").alias("Max Close"),min("adjusted").alias("Min Adjusted")).collect()