In [1]:
//Generate Spark Dataframe from a RDD in Scala
import scala.util.Random
val rdd = spark.sparkContext.parallelize(1 to 10).map(x => (x,Random.nextInt(100)* x))
val DF = rdd.toDF("key","value")

Intitializing Scala interpreter ...

Spark Web UI available at http://LAPTOP-MKM81J0P:4040
SparkContext available as 'sc' (version = 3.0.0, master = local[*], app id = local-1599499211351)
SparkSession available as 'spark'


import scala.util.Random
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at map at <console>:26
DF: org.apache.spark.sql.DataFrame = [key: int, value: int]


In [2]:
//Check for the structure
DF.printSchema

root
 |-- key: integer (nullable = false)
 |-- value: integer (nullable = false)



In [4]:
//View it as a Spark DataFrame
DF.show(5)

+---+-----+
|key|value|
+---+-----+
|  1|   10|
|  2|   94|
|  3|   66|
|  4|  268|
|  5|  105|
+---+-----+
only showing top 5 rows



In [5]:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val peopleRDD = spark.sparkContext.parallelize(Array(Row(1L, "Shabbir", 30L),Row(2L, "Vivek", 25L)))
val schema = StructType(Array(
StructField("id", LongType, true),
StructField("name", StringType, true),
StructField("age", LongType, true)
))
val peopleDF = spark.createDataFrame(peopleRDD, schema)

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
peopleRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[8] at parallelize at <console>:27
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,true), StructField(name,StringType,true), StructField(age,LongType,true))
peopleDF: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 1 more field]


In [6]:
peopleDF.printSchema

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [7]:
peopleDF.show

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|Shabbir| 30|
|  2|  Vivek| 25|
+---+-------+---+



In [8]:
//Spark Dataframe using the range function
val df1 = spark.range(5).toDF("num").show

+---+
|num|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



df1: Unit = ()


In [9]:
//Convert Sequence key-values to spark DataFrame
val movies = Seq(("Damon, Matt", "The Bourne Ultimatum", 2007L),("Damon, Matt", "Good Will Hunting", 1997L))
val moviesDF = movies.toDF("actor", "title", "year")

movies: Seq[(String, String, Long)] = List((Damon, Matt,The Bourne Ultimatum,2007), (Damon, Matt,Good Will Hunting,1997))
moviesDF: org.apache.spark.sql.DataFrame = [actor: string, title: string ... 1 more field]


In [10]:
moviesDF.printSchema

root
 |-- actor: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = false)



In [11]:
moviesDF.show

+-----------+--------------------+----+
|      actor|               title|year|
+-----------+--------------------+----+
|Damon, Matt|The Bourne Ultimatum|2007|
|Damon, Matt|   Good Will Hunting|1997|
+-----------+--------------------+----+



In [12]:
//Initialize Dataframe reader and to csv local system
val df = spark.read.option("header","true").option("inferSchema","true")
.csv("C:/Users/sgove/OneDrive/Desktop/all.csv")
df.printSchema

root
 |-- SYMBOL: string (nullable = true)
 |-- SERIES: string (nullable = true)
 |-- OPEN: double (nullable = true)
 |-- HIGH: double (nullable = true)
 |-- LOW: double (nullable = true)
 |-- CLOSE: double (nullable = true)
 |-- LAST: double (nullable = true)
 |-- PREVCLOSE: double (nullable = true)
 |-- TOTTRDQTY: integer (nullable = true)
 |-- TOTTRDVAL: double (nullable = true)
 |-- TIMESTAMP: string (nullable = true)
 |-- _c11: integer (nullable = true)



df: org.apache.spark.sql.DataFrame = [SYMBOL: string, SERIES: string ... 10 more fields]


In [13]:
df.count

res8: Long = 1893059


In [14]:
df.columns

res9: Array[String] = Array(SYMBOL, SERIES, OPEN, HIGH, LOW, CLOSE, LAST, PREVCLOSE, TOTTRDQTY, TOTTRDVAL, TIMESTAMP, _c11)


In [15]:
df.dtypes

res10: Array[(String, String)] = Array((SYMBOL,StringType), (SERIES,StringType), (OPEN,DoubleType), (HIGH,DoubleType), (LOW,DoubleType), (CLOSE,DoubleType), (LAST,DoubleType), (PREVCLOSE,DoubleType), (TOTTRDQTY,IntegerType), (TOTTRDVAL,DoubleType), (TIMESTAMP,StringType), (_c11,IntegerType))


In [16]:
//Apply SQL like functions on spark dataframe with functional programming
df.select("OPEN","HIGH","CLOSE","LOW").show()

+------+-------+-------+-------+
|  OPEN|   HIGH|  CLOSE|    LOW|
+------+-------+-------+-------+
| 37.75|  37.75|  37.45|  36.35|
| 43.75|   45.3|   44.9|  43.75|
|3374.0|3439.95| 3397.5| 3338.0|
| 281.8| 294.45|  289.2|  279.8|
| 127.0|  132.0|  131.3| 126.55|
|  50.0|   50.0|  49.25|   49.0|
| 58.45|  58.45|  56.65|   56.6|
| 620.0| 645.95|  643.3|  617.0|
| 796.8|  796.8|  785.2| 777.35|
|1379.0| 1379.0| 1353.2|1335.05|
|129.55|  130.8|  130.0| 128.35|
| 367.0|  374.0|  370.0|  335.6|
|  15.0|   16.0|  15.95|   15.0|
|816.45|  844.7| 824.85|  812.4|
|  14.4|  15.25|  15.05|   14.2|
|1070.0| 1098.0|1091.85|1069.95|
|  43.2|   44.9|   44.5|   42.0|
| 228.0|  228.0|  224.5|  223.1|
| 58.75|   62.7|  58.35|  58.35|
| 666.0|  668.2|  661.0|  652.3|
+------+-------+-------+-------+
only showing top 20 rows



In [17]:
df.filter("HIGH > 2000").show()

+----------+------+-------+-------+-------+-------+-------+---------+---------+---------------+-----------+----+
|    SYMBOL|SERIES|   OPEN|   HIGH|    LOW|  CLOSE|   LAST|PREVCLOSE|TOTTRDQTY|      TOTTRDVAL|  TIMESTAMP|_c11|
+----------+------+-------+-------+-------+-------+-------+---------+---------+---------------+-----------+----+
|   3MINDIA|    EQ| 3374.0|3439.95| 3338.0| 3397.5| 3400.0|   3364.7|      871|     2941547.35|01-APR-2011|null|
|ASIANPAINT|    EQ|2568.95|2568.95| 2511.0|2520.55| 2515.3|   2525.8|    57384|  1.450865968E8|01-APR-2011|null|
|   AVENTIS|    EQ| 2011.0| 2018.0|1976.15|1993.55| 1983.0|  2011.95|      569|      1138432.5|01-APR-2011|null|
|  AXISGOLD|    EQ| 2075.0|2086.15| 2053.7| 2072.9| 2070.0|   2066.0|      476|       982519.7|01-APR-2011|null|
|      BHEL|    EQ| 2077.4| 2134.7| 2062.0| 2127.7| 2124.0|  2062.65|   810279|1.71175017195E9|01-APR-2011|null|
|  BOSCHLTD|    EQ|6501.05| 6800.0|6501.05|6659.15| 6674.0|   6708.5|     3139|  2.104645035E7|0

In [18]:
df.where("HIGH < 2000").show()

+----------+------+------+------+-------+-------+-------+---------+---------+--------------+-----------+----+
|    SYMBOL|SERIES|  OPEN|  HIGH|    LOW|  CLOSE|   LAST|PREVCLOSE|TOTTRDQTY|     TOTTRDVAL|  TIMESTAMP|_c11|
+----------+------+------+------+-------+-------+-------+---------+---------+--------------+-----------+----+
| 20MICRONS|    EQ| 37.75| 37.75|  36.35|  37.45|   37.3|    37.15|    38638|     1420968.1|01-APR-2011|null|
|3IINFOTECH|    EQ| 43.75|  45.3|  43.75|   44.9|   44.8|    43.85|  1239690| 5.531120435E7|01-APR-2011|null|
|    A2ZMES|    EQ| 281.8|294.45|  279.8|  289.2|  287.2|    281.3|   140643|  4.02640755E7|01-APR-2011|null|
|AARTIDRUGS|    EQ| 127.0| 132.0| 126.55|  131.3|  130.6|    127.6|     2972|      384468.2|01-APR-2011|null|
|  AARTIIND|    EQ|  50.0|  50.0|   49.0|  49.25|  49.35|    49.05|    24056|    1188195.85|01-APR-2011|null|
| AARVEEDEN|    EQ| 58.45| 58.45|   56.6|  56.65|   56.6|    56.55|      123|        7000.1|01-APR-2011|null|
|      ABA

In [19]:
df.select("OPEN","HIGH").describe().show()

+-------+------------------+------------------+
|summary|              OPEN|              HIGH|
+-------+------------------+------------------+
|  count|           1893059|           1893059|
|   mean| 361.2689533078383| 366.7326563885202|
| stddev|1747.4329870382217|1763.9932112529843|
|    min|              0.05|              0.05|
|    max|          116480.0|          116490.0|
+-------+------------------+------------------+



In [20]:
// register movies as global temporary view called movies_g
df.createOrReplaceTempView("stocks")

In [21]:
//Use Proper SQL queries on Spark DataSet & DataFrames
spark.sql("select * from stocks where SYMBOL IN ('TCS','GEOMETRIC')").show(5)

+---------+------+------+-------+-------+-------+------+---------+---------+---------------+-----------+-----+
|   SYMBOL|SERIES|  OPEN|   HIGH|    LOW|  CLOSE|  LAST|PREVCLOSE|TOTTRDQTY|      TOTTRDVAL|  TIMESTAMP| _c11|
+---------+------+------+-------+-------+-------+------+---------+---------+---------------+-----------+-----+
|GEOMETRIC|    EQ| 62.35|   64.5|   61.4|  63.25| 63.25|     61.3|    82246|     5179345.65|01-APR-2011| null|
|      TCS|    EQ|1185.0|1198.75|1172.55|1180.15|1181.9|   1183.9|   899812|1.06351115885E9|01-APR-2011| null|
|GEOMETRIC|    EQ| 100.7|  105.5|   99.1|  103.5|102.55|    100.2|   124482|   1.27532668E7|01-APR-2013| 2690|
|      TCS|    EQ|1565.0| 1573.7|1551.25|1556.85|1552.1|  1575.75|   484406|  7.564599597E8|01-APR-2013|19638|
|GEOMETRIC|    EQ| 116.0|  121.0|  116.0|  120.0| 120.2|   115.55|   644060|     7.701543E7|01-APR-2014| 6430|
+---------+------+------+-------+-------+-------+------+---------+---------+---------------+-----------+-----+
o

In [22]:
//Check for Pearson Correlation
spark.sql("SELECT CORR(CLOSE,OPEN) AS PEARSON_CORR_CLOSE FROM stocks").show

+------------------+
|PEARSON_CORR_CLOSE|
+------------------+
|0.9997560995948557|
+------------------+



In [23]:
//Save a Spark DataFrame as a CSV file on your System
val a = spark.sql("select * from stocks where SYMBOL IN ('TCS','GEOMETRIC')")
a.write.format("csv").option("sep", ",").save("C:/Users/sgove/OneDrive/Desktop/DataFrametocsv")

a: org.apache.spark.sql.DataFrame = [SYMBOL: string, SERIES: string ... 10 more fields]
