In [1]:
'''
Earlier before spark 2 it was SparkContext which enables communication between cluster and resource manager.
Also it enables access to SQLContext,HiveContext.
For SparkContext we need SparkConfig
Example:
    import org.apache.spark.{SparkContext,SparkConf}
    val sparkConf=new SparkConf().setAppName("testApp").setMaster("yarn")
    val sc=new SparkContext(sparkConf)
    
SQLContext is a spark module for structure data processing and entry point for SparkSQL.
    import org.apache.spark.{SparkContext,SparkConf}
    import org.apache.spark.sql.SqlContext
    val sparkConf=new SparkConf().setAppName("testApp").setMaster("yarn")
    val sc=new sparkContext(sparkConf)
    val sqlcontext=new SQLContext(sc)
    
HIVEContext is used if the spark application requires to communicate with HIVE.
    import org.apache.spark.{SparkContext,SparkConf}
    import org.apache.spark.sql.hive.hiveContext
    val sparkConf=new SparkConf().setAppName("testApp").setMaster("yarn")
    val sc=new sparkContext(sparkConf)
    val hivecontext=new hiveContext(sc)
'''
# SparkSession is imported as it is the entry point for spark cluster.
# For hive support enable hive support

# All the necessary imports
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('testSchema').getOrCreate()
# for use of hive we can use the following
# spark=SparkSession.builder
#     .appName('testSchema')
#     .ebableHiveSupport()
#     .getOrCreate()

In [2]:
df=spark.read.csv("C:\\Users\\Nilakantha\\Documents\\My Folder\\pythonPOCs\\walmart_stock.csv")
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
df.limit(10)

_c0,_c1,_c2,_c3,_c4,_c5,_c6
Date,Open,High,Low,Close,Volume,Adj Close
1/3/2012,59.97,61.06,59.87,60.33,12668800,52.619235
1/4/2012,60.21,60.35,59.47,59.71,9593300,52.078475
1/5/2012,59.35,59.62,58.37,59.42,12768200,51.825539
1/6/2012,59.42,59.45,58.87,59,8069400,51.45922
1/9/2012,59.03,59.55,58.92,59.18,6679300,51.616215
1/10/2012,59.43,59.71,58.98,59.04,6907300,51.494109
1/11/2012,59.06,59.53,59.04,59.4,6365600,51.808098
1/12/2012,59.79,60,59.4,59.5,7236400,51.895316
1/13/2012,59.18,59.61,59.01,59.54,7729300,51.930204


In [1]:
from pyspark.sql.types import (StructField,DateType,TimestampType,DoubleType,IntegerType,StructType)
dataSchema=[StructField('Date',TimestampType(),True),
           StructField('Open',DoubleType(),True),
           StructField('High',DoubleType(),True),
           StructField('Low',DoubleType(),True),
           StructField('Close',DoubleType(),True),
           StructField('Volume',DoubleType(),True),
           StructField('Adj Close',DoubleType(),True)
           ]
FinalSchema=StructType(fields=dataSchema)
df=spark.read.format("csv").option("header","true").option("timestampFormat","mm-dd-yyyy").load("C:\\Users\\Nilakantha\\Documents\\My Folder\\pythonPOCs\\walmart_stock.csv",schema=FinalSchema)
df.show()

ModuleNotFoundError: No module named 'pyspark'

In [4]:
df.show()
df.limit(2)
df.printSchema()

+----+-----+-----+-----+-----+---------+---------+
|Date| Open| High|  Low|Close|   Volume|Adj Close|
+----+-----+-----+-----+-----+---------+---------+
|null|59.97|61.06|59.87|60.33|1.26688E7|52.619235|
|null|60.21|60.35|59.47|59.71|9593300.0|52.078475|
|null|59.35|59.62|58.37|59.42|1.27682E7|51.825539|
|null|59.42|59.45|58.87| 59.0|8069400.0| 51.45922|
|null|59.03|59.55|58.92|59.18|6679300.0|51.616215|
|null|59.43|59.71|58.98|59.04|6907300.0|51.494109|
|null|59.06|59.53|59.04| 59.4|6365600.0|51.808098|
|null|59.79| 60.0| 59.4| 59.5|7236400.0|51.895316|
|null|59.18|59.61|59.01|59.54|7729300.0|51.930204|
|null|59.87|60.11|59.52|59.85|8500000.0|52.200581|
|null|59.79|60.03|59.65|60.01|5911400.0|52.340131|
|null|59.93|60.73|59.75|60.61|9234600.0|52.863447|
|null|60.75|61.25|60.67|61.01|1.03788E7|53.212321|
|null|60.81|60.98|60.51|60.91|7134100.0|53.125104|
|null|60.75| 62.0|60.75|61.39|7362800.0|53.543754|
|null|61.18|61.61|61.04|61.47|5915800.0|53.613531|
|null| 61.8|61.84|60.77|60.97|7

In [5]:
df['Date']
type(df['Date'])

pyspark.sql.column.Column

In [6]:
df.select(['Date'])
# df.select(['Date']).show()
df.select(['Date','Open']).show()


+----+-----+
|Date| Open|
+----+-----+
|null|59.97|
|null|60.21|
|null|59.35|
|null|59.42|
|null|59.03|
|null|59.43|
|null|59.06|
|null|59.79|
|null|59.18|
|null|59.87|
|null|59.79|
|null|59.93|
|null|60.75|
|null|60.81|
|null|60.75|
|null|61.18|
|null| 61.8|
|null|60.86|
|null|60.47|
|null|61.53|
+----+-----+
only showing top 20 rows



In [7]:
df.withColumn('newDate',df['Date']).show()

+----+-----+-----+-----+-----+---------+---------+-------+
|Date| Open| High|  Low|Close|   Volume|Adj Close|newDate|
+----+-----+-----+-----+-----+---------+---------+-------+
|null|59.97|61.06|59.87|60.33|1.26688E7|52.619235|   null|
|null|60.21|60.35|59.47|59.71|9593300.0|52.078475|   null|
|null|59.35|59.62|58.37|59.42|1.27682E7|51.825539|   null|
|null|59.42|59.45|58.87| 59.0|8069400.0| 51.45922|   null|
|null|59.03|59.55|58.92|59.18|6679300.0|51.616215|   null|
|null|59.43|59.71|58.98|59.04|6907300.0|51.494109|   null|
|null|59.06|59.53|59.04| 59.4|6365600.0|51.808098|   null|
|null|59.79| 60.0| 59.4| 59.5|7236400.0|51.895316|   null|
|null|59.18|59.61|59.01|59.54|7729300.0|51.930204|   null|
|null|59.87|60.11|59.52|59.85|8500000.0|52.200581|   null|
|null|59.79|60.03|59.65|60.01|5911400.0|52.340131|   null|
|null|59.93|60.73|59.75|60.61|9234600.0|52.863447|   null|
|null|60.75|61.25|60.67|61.01|1.03788E7|53.212321|   null|
|null|60.81|60.98|60.51|60.91|7134100.0|53.125104|   nul

In [8]:
# df.withColumn('Double-High',df['High']*2)
df.withColumn('Double-High',df['High']*2).show(5)

+----+-----+-----+-----+-----+---------+---------+-----------+
|Date| Open| High|  Low|Close|   Volume|Adj Close|Double-High|
+----+-----+-----+-----+-----+---------+---------+-----------+
|null|59.97|61.06|59.87|60.33|1.26688E7|52.619235|     122.12|
|null|60.21|60.35|59.47|59.71|9593300.0|52.078475|      120.7|
|null|59.35|59.62|58.37|59.42|1.27682E7|51.825539|     119.24|
|null|59.42|59.45|58.87| 59.0|8069400.0| 51.45922|      118.9|
|null|59.03|59.55|58.92|59.18|6679300.0|51.616215|      119.1|
+----+-----+-----+-----+-----+---------+---------+-----------+
only showing top 5 rows



In [9]:
df.withColumnRenamed("High","Large").show(5)

+----+-----+-----+-----+-----+---------+---------+
|Date| Open|Large|  Low|Close|   Volume|Adj Close|
+----+-----+-----+-----+-----+---------+---------+
|null|59.97|61.06|59.87|60.33|1.26688E7|52.619235|
|null|60.21|60.35|59.47|59.71|9593300.0|52.078475|
|null|59.35|59.62|58.37|59.42|1.27682E7|51.825539|
|null|59.42|59.45|58.87| 59.0|8069400.0| 51.45922|
|null|59.03|59.55|58.92|59.18|6679300.0|51.616215|
+----+-----+-----+-----+-----+---------+---------+
only showing top 5 rows



In [10]:
# Filter some value and select only couple of columns and show them
df.filter('Open>59').select('Open','Close').show(5)

# same as filtering on the df as below

df.filter(df['Close']>59).select('Close','Open').show(5)

+-----+-----+
| Open|Close|
+-----+-----+
|59.97|60.33|
|60.21|59.71|
|59.35|59.42|
|59.42| 59.0|
|59.03|59.18|
+-----+-----+
only showing top 5 rows

+-----+-----+
|Close| Open|
+-----+-----+
|60.33|59.97|
|59.71|60.21|
|59.42|59.35|
|59.18|59.03|
|59.04|59.43|
+-----+-----+
only showing top 5 rows



In [11]:
df.filter((df['Close']>59) & (df['Open']<59)).select('Close','Open').show()

+-----+-----+
|Close| Open|
+-----+-----+
|59.08|58.84|
|59.01|58.99|
| 59.4|58.96|
|59.07|58.95|
|59.01|58.96|
|59.19|58.55|
|59.03|58.48|
|59.35|58.95|
|59.04|58.69|
|59.55|58.82|
+-----+-----+



In [12]:
result=df.filter(df['Adj Close']==52.619235).show()

+----+-----+-----+-----+-----+---------+---------+
|Date| Open| High|  Low|Close|   Volume|Adj Close|
+----+-----+-----+-----+-----+---------+---------+
|null|59.97|61.06|59.87|60.33|1.26688E7|52.619235|
+----+-----+-----+-----+-----+---------+---------+



In [13]:
# Take a row and do operations
result=df

In [14]:
df

Date,Open,High,Low,Close,Volume,Adj Close
,59.97,61.06,59.87,60.33,12668800.0,52.619235
,60.21,60.35,59.47,59.71,9593300.0,52.078475
,59.35,59.62,58.37,59.42,12768200.0,51.825539
,59.42,59.45,58.87,59.0,8069400.0,51.45922
,59.03,59.55,58.92,59.18,6679300.0,51.616215
,59.43,59.71,58.98,59.04,6907300.0,51.494109
,59.06,59.53,59.04,59.4,6365600.0,51.808098
,59.79,60.0,59.4,59.5,7236400.0,51.895316
,59.18,59.61,59.01,59.54,7729300.0,51.930204
,59.87,60.11,59.52,59.85,8500000.0,52.200581


In [15]:
df.head(2)

[Row(Date=None, Open=59.97, High=61.06, Low=59.87, Close=60.33, Volume=12668800.0, Adj Close=52.619235),
 Row(Date=None, Open=60.21, High=60.35, Low=59.47, Close=59.71, Volume=9593300.0, Adj Close=52.078475)]

In [16]:
df.head(2)[0]
df.head(2)[0][0]

In [18]:
df.withColumn("current_date",current_date()) \
  .withColumn("current_timestamp",current_timestamp()) \
  .show(truncate=False)


NameError: name 'current_date' is not defined