In [1]:
# install the dependencies
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.1.3/spark-3.1.3-bin-hadoop3.2.tgz
!tar xf spark-3.1.3-bin-hadoop3.2.tgz
!pip -q install findspark

In [2]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.3-bin-hadoop3.2"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:0.7.0 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog pyspark-shell'


import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

Dataset:
https://www.kaggle.com/datasets/ruchi798/data-science-job-salaries

In [3]:
# Show some settings of spark
print(SparkConf().getAll())

[('spark.jars', 'file:///root/.ivy2/jars/io.delta_delta-core_2.12-0.7.0.jar,file:///root/.ivy2/jars/org.antlr_antlr4-4.7.jar,file:///root/.ivy2/jars/org.antlr_antlr4-runtime-4.7.jar,file:///root/.ivy2/jars/org.antlr_antlr-runtime-3.5.2.jar,file:///root/.ivy2/jars/org.antlr_ST4-4.0.8.jar,file:///root/.ivy2/jars/org.abego.treelayout_org.abego.treelayout.core-1.0.3.jar,file:///root/.ivy2/jars/org.glassfish_javax.json-1.0.4.jar,file:///root/.ivy2/jars/com.ibm.icu_icu4j-58.2.jar'), ('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension'), ('spark.repl.local.jars', 'file:///root/.ivy2/jars/io.delta_delta-core_2.12-0.7.0.jar,file:///root/.ivy2/jars/org.antlr_antlr4-4.7.jar,file:///root/.ivy2/jars/org.antlr_antlr4-runtime-4.7.jar,file:///root/.ivy2/jars/org.antlr_antlr-runtime-3.5.2.jar,file:///root/.ivy2/jars/org.antlr_ST4-4.0.8.jar,file:///root/.ivy2/jars/org.abego.treelayout_org.abego.treelayout.core-1.0.3.jar,file:///root/.ivy2/jars/org.glassfish_javax.json-1.0.4.jar,file:///roo

In [27]:
# open specific file 
df = spark.read.csv('/lake/csv/ds_salaries.csv', header = True, inferSchema = True)

In [28]:
# open all csv files in the lake folder
df = spark.read \
          .format('csv') \
          .option('inferSchema', 'true') \
          .option('header', 'true') \
          .load('/lake/csv/*.csv')

In [29]:
df.show(5)

+---+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|_c0|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|  0|     2020|              MI|             FT|      Data Scientist| 70000|            EUR|        79833|                DE|           0|              DE|           L|
|  1|     2020|              SE|             FT|Machine Learning ...|260000|            USD|       260000|                JP|           0|              JP|           S|
|  2|     2020|              SE|             FT|   Big Data Engineer| 85000|            GBP|       109024|                GB|          50|              GB|

In [None]:
# save csv
df.write.format('csv') \
        .mode('overwrite') \
        .partitionBy("experience_level") \
        .save("/home")

In [32]:
# save parquet
df.write.format('parquet') \
        .mode('overwrite') \
        .partitionBy('employee_residence',"experience_level") \
        .save("/lake/parquet")

In [98]:
# open parquet
df = spark.read.format('parquet').load('/lake/parquet')

In [100]:
# save orc
df.write.format('orc').mode('overwrite').save('/lake/orc')

In [108]:
# save delta
df.write.format('delta').mode('overwrite').save('/lake/delta')

In [110]:
# open delta using the versionAsOf
df= spark.read.format('delta').option('versionAsOf', '0').load('/lake/delta')

In [117]:
# adding duplicates data in the delta table and reading using Time Travel feature with timestampAsOf
df.write.format('delta').mode('append').save('/lake/delta')
df = spark.read.format('delta').option('timestampAsOf', '2022-11-07 12:40').load('/lake/delta')

In [119]:
# the table was create in the 2022-11-07 12:38:46.635