In [1]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("Basics").getOrCreate()

23/02/10 21:23:12 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [37]:
df = spark.read.csv("data/nces330_20.csv")

In [38]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)



In [39]:
df.columns

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5']

In [40]:
df.show()

+----+-------+-------------------+------+------------+-----+
| _c0|    _c1|                _c2|   _c3|         _c4|  _c5|
+----+-------+-------------------+------+------------+-----+
|Year|  State|               Type|Length|     Expense|Value|
|2013|Alabama|            Private|4-year|Fees/Tuition|13983|
|2013|Alabama|            Private|4-year|  Room/Board| 8503|
|2013|Alabama|    Public In-State|2-year|Fees/Tuition| 4048|
|2013|Alabama|    Public In-State|4-year|Fees/Tuition| 8073|
|2013|Alabama|    Public In-State|4-year|  Room/Board| 8473|
|2013|Alabama|Public Out-of-State|2-year|Fees/Tuition| 7736|
|2013|Alabama|Public Out-of-State|4-year|Fees/Tuition|20380|
|2013|Alabama|Public Out-of-State|4-year|  Room/Board| 8473|
|2013| Alaska|            Private|4-year|Fees/Tuition|21496|
|2013| Alaska|            Private|4-year|  Room/Board| 8923|
|2013| Alaska|    Public In-State|2-year|Fees/Tuition| 3972|
|2013| Alaska|    Public In-State|4-year|Fees/Tuition| 6317|
|2013| Alaska|    Public

In [41]:
df.describe()

DataFrame[summary: string, _c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string]

In [109]:
from pyspark.sql.types import (StructField, StringType, IntegerType, StructType, FloatType, LongType)

In [43]:
data_schema = [StructField('Year', IntegerType(), True), StructField('State', StringType(), True), 
               StructField('Type', StringType(), True), StructField('Lenght', StringType(), True),
               StructField('Expense', StringType(), True), StructField('Value', IntegerType(), True)
              ]

In [44]:
final_struct = StructType(fields=data_schema)

In [45]:
df = spark.read.csv("data/nces330_20.csv", schema=final_struct)
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Lenght: string (nullable = true)
 |-- Expense: string (nullable = true)
 |-- Value: integer (nullable = true)



In [46]:
type(df["Year"])

pyspark.sql.column.Column

In [47]:
type(df.select("Year"))


pyspark.sql.dataframe.DataFrame

In [48]:
print(df.head(2)[0])

Row(Year=None, State='State', Type='Type', Lenght='Length', Expense='Expense', Value=None)


In [49]:
df.select(['Year', 'Value'])

DataFrame[Year: int, Value: int]

In [50]:
df.withColumn('Price UAH', df['Value']*40.2).show()

+----+-------+-------------------+------+------------+-----+------------------+
|Year|  State|               Type|Lenght|     Expense|Value|         Price UAH|
+----+-------+-------------------+------+------------+-----+------------------+
|null|  State|               Type|Length|     Expense| null|              null|
|2013|Alabama|            Private|4-year|Fees/Tuition|13983| 562116.6000000001|
|2013|Alabama|            Private|4-year|  Room/Board| 8503|341820.60000000003|
|2013|Alabama|    Public In-State|2-year|Fees/Tuition| 4048|          162729.6|
|2013|Alabama|    Public In-State|4-year|Fees/Tuition| 8073|324534.60000000003|
|2013|Alabama|    Public In-State|4-year|  Room/Board| 8473|340614.60000000003|
|2013|Alabama|Public Out-of-State|2-year|Fees/Tuition| 7736|          310987.2|
|2013|Alabama|Public Out-of-State|4-year|Fees/Tuition|20380|          819276.0|
|2013|Alabama|Public Out-of-State|4-year|  Room/Board| 8473|340614.60000000003|
|2013| Alaska|            Private|4-year

In [54]:
# register DF as a temporary SQL VIEW
df.createOrReplaceTempView("uni")
results = spark.sql("select * from uni")
results.show()

+----+-------+-------------------+------+------------+-----+
|Year|  State|               Type|Lenght|     Expense|Value|
+----+-------+-------------------+------+------------+-----+
|null|  State|               Type|Length|     Expense| null|
|2013|Alabama|            Private|4-year|Fees/Tuition|13983|
|2013|Alabama|            Private|4-year|  Room/Board| 8503|
|2013|Alabama|    Public In-State|2-year|Fees/Tuition| 4048|
|2013|Alabama|    Public In-State|4-year|Fees/Tuition| 8073|
|2013|Alabama|    Public In-State|4-year|  Room/Board| 8473|
|2013|Alabama|Public Out-of-State|2-year|Fees/Tuition| 7736|
|2013|Alabama|Public Out-of-State|4-year|Fees/Tuition|20380|
|2013|Alabama|Public Out-of-State|4-year|  Room/Board| 8473|
|2013| Alaska|            Private|4-year|Fees/Tuition|21496|
|2013| Alaska|            Private|4-year|  Room/Board| 8923|
|2013| Alaska|    Public In-State|2-year|Fees/Tuition| 3972|
|2013| Alaska|    Public In-State|4-year|Fees/Tuition| 6317|
|2013| Alaska|    Public

In [55]:
results = spark.sql("select * from uni where Value>20000 and State='Alaska'")
results.show()


+----+------+-------------------+------+------------+-----+
|Year| State|               Type|Lenght|     Expense|Value|
+----+------+-------------------+------+------------+-----+
|2013|Alaska|            Private|4-year|Fees/Tuition|21496|
|2014|Alaska|            Private|4-year|Fees/Tuition|20943|
|2016|Alaska|Public Out-of-State|4-year|Fees/Tuition|20463|
|2017|Alaska|Public Out-of-State|4-year|Fees/Tuition|21431|
|2018|Alaska|Public Out-of-State|4-year|Fees/Tuition|21284|
|2019|Alaska|Public Out-of-State|4-year|Fees/Tuition|24454|
|2020|Alaska|Public Out-of-State|4-year|Fees/Tuition|26767|
|2021|Alaska|Public Out-of-State|4-year|Fees/Tuition|25535|
+----+------+-------------------+------+------------+-----+



In [64]:
df.filter("Value > 20000").select(['Year', 'State', 'Value']).show()

+----+--------------------+-----+
|Year|               State|Value|
+----+--------------------+-----+
|2013|             Alabama|20380|
|2013|              Alaska|21496|
|2013|             Arizona|21201|
|2013|          California|28345|
|2013|          California|30765|
|2013|            Colorado|25470|
|2013|         Connecticut|35336|
|2013|         Connecticut|26688|
|2013|            Delaware|26228|
|2013|District of Columbia|35524|
|2013|             Florida|20155|
|2013|             Georgia|22456|
|2013|             Georgia|22393|
|2013|              Hawaii|23614|
|2013|            Illinois|26299|
|2013|            Illinois|26873|
|2013|             Indiana|26794|
|2013|             Indiana|26538|
|2013|                Iowa|23019|
|2013|              Kansas|20852|
+----+--------------------+-----+
only showing top 20 rows



In [69]:
df.filter((df["Value"] > 20000) & ~(df["State"] == "Alaska")).select(['Year', 'State', 'Value']).show()

+----+--------------------+-----+
|Year|               State|Value|
+----+--------------------+-----+
|2013|             Alabama|20380|
|2013|             Arizona|21201|
|2013|          California|28345|
|2013|          California|30765|
|2013|            Colorado|25470|
|2013|         Connecticut|35336|
|2013|         Connecticut|26688|
|2013|            Delaware|26228|
|2013|District of Columbia|35524|
|2013|             Florida|20155|
|2013|             Georgia|22456|
|2013|             Georgia|22393|
|2013|              Hawaii|23614|
|2013|            Illinois|26299|
|2013|            Illinois|26873|
|2013|             Indiana|26794|
|2013|             Indiana|26538|
|2013|                Iowa|23019|
|2013|              Kansas|20852|
|2013|            Kentucky|20639|
+----+--------------------+-----+
only showing top 20 rows



In [71]:
df.groupBy("State")

<pyspark.sql.group.GroupedData at 0x1090e7810>

In [73]:
df.groupBy("State").mean("Value").show()

+--------------------+------------------+
|               State|        avg(Value)|
+--------------------+------------------+
|                Utah| 8918.549295774648|
|              Hawaii| 12524.30985915493|
|           Minnesota|12642.295774647888|
|                Ohio|13620.802816901409|
|            Arkansas| 10233.81690140845|
|              Oregon| 15533.43661971831|
|               Texas| 12596.38028169014|
|        North Dakota|  9227.74647887324|
|        Pennsylvania|16368.957746478873|
|         Connecticut|17435.211267605635|
|            Nebraska| 10701.81690140845|
|             Vermont| 18596.49295774648|
|              Nevada|           12710.6|
|               State|              null|
|          Washington|14730.028169014084|
|            Illinois|15167.633802816901|
|            Oklahoma|11215.225352112677|
|District of Columbia|           18880.2|
|            Delaware|13834.622950819672|
|              Alaska|11629.174603174602|
+--------------------+------------

In [80]:
df.agg({"Value": "max"}).show()

+----------+
|max(Value)|
+----------+
|     49152|
+----------+



In [104]:
from pyspark.sql.functions import countDistinct, avg, stddev, mean
df.select(countDistinct("State").alias('State Name')).show()

+----------+
|State Name|
+----------+
|        52|
+----------+



In [87]:
df.na.drop(thresh=10)
df.na.drop(how="any") # default: any missing values are dropped 
df.na.drop(subset=['State']) # concider any of this subset to be missing and drop it. 
df.na.fill("FILL VALUE")

DataFrame[Year: int, State: string, Type: string, Lenght: string, Expense: string, Value: int]

In [102]:
m_value = df.select(avg(df["Value"])).collect()
m_value

[Row(avg(Value)=13027.72012401353)]

In [105]:
m_value = df.select(mean("Value")).collect()
m_value

[Row(avg(Value)=13027.72012401353)]

In [114]:
data_schema = StructType(fields=
                         [
                             StructField('Timestamp', StringType(), True), 
                             StructField('Date', StringType(), True), 
                             StructField('Symbol', StringType(), True),
                             StructField('Open', FloatType(), True), 
                             StructField('Hight', FloatType(), True),
                             StructField('Low', FloatType(), True), 
                             StructField('Volume', FloatType(), True)            
                          ]
                        )
time_df = spark.read.csv("data/gemini_BTCUSD_2020_1min.csv", schema=data_schema)
time_df.printSchema()


root
 |-- Timestamp: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Symbol: string (nullable = true)
 |-- Open: float (nullable = true)
 |-- Hight: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Volume: float (nullable = true)



In [117]:
from pyspark.sql.functions import dayofmonth, dayofyear, hour, month, year, format_number, date_format

In [115]:
time_df.show()

+--------------+---------------+------+--------+--------+--------+--------+
|     Timestamp|           Date|Symbol|    Open|   Hight|     Low|  Volume|
+--------------+---------------+------+--------+--------+--------+--------+
|Unix Timestamp|           Date|Symbol|    null|    null|    null|    null|
|   1.61888E+12| 4/20/2021 0:02|BTCUSD|55717.47| 55723.0|55541.69|55541.69|
|   1.61888E+12| 4/20/2021 0:01|BTCUSD|55768.94|55849.82|55711.74|55717.47|
|   1.61888E+12| 4/20/2021 0:00|BTCUSD|55691.79|55793.15|55691.79|55768.94|
|   1.61888E+12|4/19/2021 23:59|BTCUSD|55777.86|55777.86|55677.92|55691.79|
|   1.61888E+12|4/19/2021 23:58|BTCUSD| 55803.5|55823.88|55773.08|55777.86|
|   1.61888E+12|4/19/2021 23:57|BTCUSD|55690.64|55822.91|55682.56| 55803.5|
|   1.61888E+12|4/19/2021 23:56|BTCUSD|55624.69|55713.02|55624.63|55690.64|
|   1.61888E+12|4/19/2021 23:55|BTCUSD|55651.82|55675.92|55621.58|55624.69|
|   1.61888E+12|4/19/2021 23:54|BTCUSD|55688.08|55730.21| 55641.4|55651.82|
|   1.61888E

In [126]:
time_df.na.drop()
time_df.select(month(time_df["Date"])).show()

+-----------+
|month(Date)|
+-----------+
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
+-----------+
only showing top 20 rows



In [139]:
import re
from pyspark.sql.functions import regexp_replace

#cols=[re.sub(r'(^_|_$)','',f.replace("/","_")) for f in time_df.columns]
#time_df.toDF(*cols).show()
time_df.withColumn("Date", regexp_replace(time_df["Date"], "\\/", "-")).show()




+--------------+---------------+------+--------+--------+--------+--------+
|     Timestamp|           Date|Symbol|    Open|   Hight|     Low|  Volume|
+--------------+---------------+------+--------+--------+--------+--------+
|Unix Timestamp|           Date|Symbol|    null|    null|    null|    null|
|   1.61888E+12| 4-20-2021 0:02|BTCUSD|55717.47| 55723.0|55541.69|55541.69|
|   1.61888E+12| 4-20-2021 0:01|BTCUSD|55768.94|55849.82|55711.74|55717.47|
|   1.61888E+12| 4-20-2021 0:00|BTCUSD|55691.79|55793.15|55691.79|55768.94|
|   1.61888E+12|4-19-2021 23:59|BTCUSD|55777.86|55777.86|55677.92|55691.79|
|   1.61888E+12|4-19-2021 23:58|BTCUSD| 55803.5|55823.88|55773.08|55777.86|
|   1.61888E+12|4-19-2021 23:57|BTCUSD|55690.64|55822.91|55682.56| 55803.5|
|   1.61888E+12|4-19-2021 23:56|BTCUSD|55624.69|55713.02|55624.63|55690.64|
|   1.61888E+12|4-19-2021 23:55|BTCUSD|55651.82|55675.92|55621.58|55624.69|
|   1.61888E+12|4-19-2021 23:54|BTCUSD|55688.08|55730.21| 55641.4|55651.82|
|   1.61888E

In [142]:
time_df.na.drop().select(hour(time_df["Date"])).show()

+----------+
|hour(Date)|
+----------+
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
+----------+
only showing top 20 rows



In [148]:
from pyspark.sql.functions import col, to_date

#simple regex, watch out for the white space
time_df = time_df.withColumn('Date', regexp_replace('Date', '( \d+:\d+)', ''))
#transform the date-column
time_df = time_df.withColumn('Date', to_date(col('Date'), "M/d/y")).withColumn('Month', month(col('Date'))).withColumn('Year', year(col('Date')))
time_df.show()
time_df.printSchema()

+--------------+----------+------+--------+--------+--------+--------+-----+----+
|     Timestamp|      Date|Symbol|    Open|   Hight|     Low|  Volume|Month|Year|
+--------------+----------+------+--------+--------+--------+--------+-----+----+
|Unix Timestamp|      null|Symbol|    null|    null|    null|    null| null|null|
|   1.61888E+12|2021-04-20|BTCUSD|55717.47| 55723.0|55541.69|55541.69|    4|2021|
|   1.61888E+12|2021-04-20|BTCUSD|55768.94|55849.82|55711.74|55717.47|    4|2021|
|   1.61888E+12|2021-04-20|BTCUSD|55691.79|55793.15|55691.79|55768.94|    4|2021|
|   1.61888E+12|2021-04-19|BTCUSD|55777.86|55777.86|55677.92|55691.79|    4|2021|
|   1.61888E+12|2021-04-19|BTCUSD| 55803.5|55823.88|55773.08|55777.86|    4|2021|
|   1.61888E+12|2021-04-19|BTCUSD|55690.64|55822.91|55682.56| 55803.5|    4|2021|
|   1.61888E+12|2021-04-19|BTCUSD|55624.69|55713.02|55624.63|55690.64|    4|2021|
|   1.61888E+12|2021-04-19|BTCUSD|55651.82|55675.92|55621.58|55624.69|    4|2021|
|   1.61888E+12|

In [149]:
time_df.na.drop().select(month(time_df["Date"])).show()

+-----------+
|month(Date)|
+-----------+
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
+-----------+
only showing top 20 rows



In [156]:
res = time_df.groupBy("Year").mean("Low").show()

+----+-----------------+
|Year|         avg(Low)|
+----+-----------------+
|null|             null|
|2021|55796.55568181818|
+----+-----------------+



In [163]:
for row in time_df.head(3):
    print(f"{row}\n")

Row(Timestamp='Unix Timestamp', Date=None, Symbol='Symbol', Open=None, Hight=None, Low=None, Volume=None, Month=None, Year=None)

Row(Timestamp='1.61888E+12', Date=datetime.date(2021, 4, 20), Symbol='BTCUSD', Open=55717.46875, Hight=55723.0, Low=55541.69140625, Volume=55541.69140625, Month=4, Year=2021)

Row(Timestamp='1.61888E+12', Date=datetime.date(2021, 4, 20), Symbol='BTCUSD', Open=55768.94140625, Hight=55849.8203125, Low=55711.73828125, Volume=55717.46875, Month=4, Year=2021)



In [172]:
from pyspark.sql.functions import format_number
result = time_df.select(time_df['Timestamp'], format_number(time_df['Timestamp'].cast('float'), 2).alias("New Timestamp")).show()

+--------------+--------------------+
|     Timestamp|       New Timestamp|
+--------------+--------------------+
|Unix Timestamp|                null|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
|   1.61888E+12|1,618,879,971,328.00|
+--------------+--------------------+
only showing top 20 rows

