## SPARK SQL APPLICATION

In [2]:
print('Hello World!')

Hello World!


In [3]:
2+4

6

## SPARK CONTEXT

In [4]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [5]:
type(sc)

pyspark.context.SparkContext

In [7]:
help(sc)

Help on SparkContext in module pyspark.context object:

class SparkContext(builtins.object)
 |  SparkContext(master: Optional[str] = None, appName: Optional[str] = None, sparkHome: Optional[str] = None, pyFiles: Optional[List[str]] = None, environment: Optional[Dict[str, Any]] = None, batchSize: int = 0, serializer: 'Serializer' = CloudPickleSerializer(), conf: Optional[pyspark.conf.SparkConf] = None, gateway: Optional[py4j.java_gateway.JavaGateway] = None, jsc: Optional[py4j.java_gateway.JavaObject] = None, profiler_cls: Type[pyspark.profiler.BasicProfiler] = <class 'pyspark.profiler.BasicProfiler'>, udf_profiler_cls: Type[pyspark.profiler.UDFBasicProfiler] = <class 'pyspark.profiler.UDFBasicProfiler'>, memory_profiler_cls: Type[pyspark.profiler.MemoryProfiler] = <class 'pyspark.profiler.MemoryProfiler'>)
 |  
 |  Main entry point for Spark functionality. A SparkContext represents the
 |  connection to a Spark cluster, and can be used to create :class:`RDD` and
 |  broadcast variables

In [6]:
dir(sc)

['PACKAGE_EXTENSIONS',
 '__annotations__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_assert_on_driver',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_encryption_enabled',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_repr_html_',
 '_serialize_to_jvm',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addArchive',
 'addFile',
 'addPyFile',
 'appName',
 'app

## RDD

In [8]:
data = [1,2,3,4]
distributedData = sc.parallelize(data)

In [9]:
tesla_file = 'TSLA.csv'
tesla_rdd = sc.textFile(tesla_file)
tesla_rdd.take(5)

['Date,Open,High,Low,Close,AdjClose,Volume',
 '2019-07-15,248.000000,254.419998,244.860001,253.500000,253.500000,11000100',
 '2019-07-16,249.300003,253.529999,247.929993,252.380005,252.380005,8149000',
 '2019-07-17,255.669998,258.309998,253.350006,254.860001,254.860001,9764700',
 '2019-07-18,255.050003,255.750000,251.889999,253.539993,253.539993,4764500']

In [10]:
type(tesla_rdd)

pyspark.rdd.RDD

In [11]:
csv_rdd = tesla_rdd.map(lambda row:row.split(","))

In [12]:
csv_rdd.collect()

[['Date', 'Open', 'High', 'Low', 'Close', 'AdjClose', 'Volume'],
 ['2019-07-15',
  '248.000000',
  '254.419998',
  '244.860001',
  '253.500000',
  '253.500000',
  '11000100'],
 ['2019-07-16',
  '249.300003',
  '253.529999',
  '247.929993',
  '252.380005',
  '252.380005',
  '8149000'],
 ['2019-07-17',
  '255.669998',
  '258.309998',
  '253.350006',
  '254.860001',
  '254.860001',
  '9764700'],
 ['2019-07-18',
  '255.050003',
  '255.750000',
  '251.889999',
  '253.539993',
  '253.539993',
  '4764500'],
 ['2019-07-19',
  '255.690002',
  '259.959991',
  '254.619995',
  '258.179993',
  '258.179993',
  '7048400'],
 ['2019-07-22',
  '258.750000',
  '262.149994',
  '254.190002',
  '255.679993',
  '255.679993',
  '6842400'],
 ['2019-07-23',
  '256.709991',
  '260.480011',
  '254.500000',
  '260.170013',
  '260.170013',
  '5023100'],
 ['2019-07-24',
  '259.170013',
  '266.070007',
  '258.160004',
  '264.880005',
  '264.880005',
  '11072800'],
 ['2019-07-25',
  '233.500000',
  '234.500000',
  '22

## SPARK DATAFRAMES

In [14]:
# Old way
# from pyspark.sql import SQLContext
# sqlContext = SQLContext(sc)

# New way
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()


In [15]:
li = [('Alice',1)]
df = spark.createDataFrame(li, ['name', 'age'])
df.collect()

[Row(name='Alice', age=1)]

In [17]:
tesla_df = csv_rdd.toDF(['date','open','high','low','close','adjclose','volume'])
tesla_df.take(5)

[Row(date='Date', open='Open', high='High', low='Low', close='Close', adjclose='AdjClose', volume='Volume'),
 Row(date='2019-07-15', open='248.000000', high='254.419998', low='244.860001', close='253.500000', adjclose='253.500000', volume='11000100'),
 Row(date='2019-07-16', open='249.300003', high='253.529999', low='247.929993', close='252.380005', adjclose='252.380005', volume='8149000'),
 Row(date='2019-07-17', open='255.669998', high='258.309998', low='253.350006', close='254.860001', adjclose='254.860001', volume='9764700'),
 Row(date='2019-07-18', open='255.050003', high='255.750000', low='251.889999', close='253.539993', adjclose='253.539993', volume='4764500')]

In [18]:
amazon_file = 'AMZN.csv'
amazon_df = spark.read.load(amazon_file, format='com.databricks.spark.csv',
                            header='true', inferSchema='true')

In [19]:
amazon_df.take(5)

[Row(Date=datetime.date(2019, 7, 15), Open=2021.400024, High=2022.900024, Low=2001.550049, Close=2020.98999, AdjClose=2020.98999, Volume=2981300),
 Row(Date=datetime.date(2019, 7, 16), Open=2010.579956, High=2026.319946, Low=2001.219971, Close=2009.900024, AdjClose=2009.900024, Volume=2618200),
 Row(Date=datetime.date(2019, 7, 17), Open=2007.050049, High=2012.0, Low=1992.030029, Close=1992.030029, AdjClose=1992.030029, Volume=2558800),
 Row(Date=datetime.date(2019, 7, 18), Open=1980.01001, High=1987.5, Low=1951.550049, Close=1977.900024, AdjClose=1977.900024, Volume=3504300),
 Row(Date=datetime.date(2019, 7, 19), Open=1991.209961, High=1996.0, Low=1962.22998, Close=1964.52002, AdjClose=1964.52002, Volume=3185600)]

In [20]:
amazon_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- AdjClose: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [21]:
amazon_df.count()

253

In [22]:
amazon_df.show()

+----------+-----------+-----------+-----------+-----------+-----------+-------+
|      Date|       Open|       High|        Low|      Close|   AdjClose| Volume|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
|2019-07-15|2021.400024|2022.900024|2001.550049| 2020.98999| 2020.98999|2981300|
|2019-07-16|2010.579956|2026.319946|2001.219971|2009.900024|2009.900024|2618200|
|2019-07-17|2007.050049|     2012.0|1992.030029|1992.030029|1992.030029|2558800|
|2019-07-18| 1980.01001|     1987.5|1951.550049|1977.900024|1977.900024|3504300|
|2019-07-19|1991.209961|     1996.0| 1962.22998| 1964.52002| 1964.52002|3185600|
|2019-07-22|1971.140015|     1989.0| 1958.26001|1985.630005|1985.630005|2900000|
|2019-07-23| 1995.98999|1997.790039|1973.130005| 1994.48999| 1994.48999|2703500|
|2019-07-24|1969.300049|2001.300049|1965.869995|2000.810059|2000.810059|2631300|
|2019-07-25|     2001.0|2001.199951|1972.719971|1973.819946|1973.819946|4136500|
|2019-07-26|     1942.0|1950

In [23]:
import pandas as pd
amazon_df.toPandas().head(5)

Unnamed: 0,Date,Open,High,Low,Close,AdjClose,Volume
0,2019-07-15,2021.400024,2022.900024,2001.550049,2020.98999,2020.98999,2981300
1,2019-07-16,2010.579956,2026.319946,2001.219971,2009.900024,2009.900024,2618200
2,2019-07-17,2007.050049,2012.0,1992.030029,1992.030029,1992.030029,2558800
3,2019-07-18,1980.01001,1987.5,1951.550049,1977.900024,1977.900024,3504300
4,2019-07-19,1991.209961,1996.0,1962.22998,1964.52002,1964.52002,3185600


## EXPLORE AND QUERY DATA

In [26]:
google_df = spark.read.csv('GOOG.csv', header=True, inferSchema=True)

In [28]:
from pyspark.sql.functions import year, month
google_df.select(year("Date").alias('yr'), "Close").groupby('yr').avg('Close').sort('yr').show()

+----+------------------+
|  yr|        avg(Close)|
+----+------------------+
|2019|1245.3833654621849|
|2020|1362.8286906865671|
+----+------------------+



In [29]:
help(google_df)

Help on DataFrame in module pyspark.sql.dataframe object:

class DataFrame(pyspark.sql.pandas.map_ops.PandasMapOpsMixin, pyspark.sql.pandas.conversion.PandasConversionMixin)
 |  DataFrame(jdf: py4j.java_gateway.JavaObject, sql_ctx: Union[ForwardRef('SQLContext'), ForwardRef('SparkSession')])
 |  
 |  A distributed collection of data grouped into named columns.
 |  
 |  .. versionadded:: 1.3.0
 |  
 |  .. versionchanged:: 3.4.0
 |      Supports Spark Connect.
 |  
 |  Examples
 |  --------
 |  A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
 |  and can be created using various functions in :class:`SparkSession`:
 |  
 |  >>> people = spark.createDataFrame([
 |  ...     {"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
 |  ...     {"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
 |  ...     {"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
 |  ...     {"deptId": 3, "age": 20,

In [31]:
amazon_df.select(year('Date').alias('year'), month('Date').alias('month'),'Low').groupby('year','month').avg('Low').sort('year','month').explain(True)

== Parsed Logical Plan ==
'Sort ['year ASC NULLS FIRST, 'month ASC NULLS FIRST], true
+- Aggregate [year#248, month#249], [year#248, month#249, avg(Low#52) AS avg(Low)#257]
   +- Project [year(Date#49) AS year#248, month(Date#49) AS month#249, Low#52]
      +- Relation [Date#49,Open#50,High#51,Low#52,Close#53,AdjClose#54,Volume#55] csv

== Analyzed Logical Plan ==
year: int, month: int, avg(Low): double
Sort [year#248 ASC NULLS FIRST, month#249 ASC NULLS FIRST], true
+- Aggregate [year#248, month#249], [year#248, month#249, avg(Low#52) AS avg(Low)#257]
   +- Project [year(Date#49) AS year#248, month(Date#49) AS month#249, Low#52]
      +- Relation [Date#49,Open#50,High#51,Low#52,Close#53,AdjClose#54,Volume#55] csv

== Optimized Logical Plan ==
Sort [year#248 ASC NULLS FIRST, month#249 ASC NULLS FIRST], true
+- Aggregate [year#248, month#249], [year#248, month#249, avg(Low#52) AS avg(Low)#257]
   +- Project [year(Date#49) AS year#248, month(Date#49) AS month#249, Low#52]
      +- Relati