In [1]:
%load_ext autoreload
%autoreload 2

In [42]:
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkFiles
from pyspark.sql.types import DateType

In [3]:
spark = SparkSession.builder.appName('DeltaLake')\
    .config("spark.jars.packages", "io.delta:delta-core_2.11:0.5.0")\
    .getOrCreate()

In [4]:
spark

In [24]:
# Read some CSV data

urls = [
    'http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-2014.csv',
    'http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-2015.csv',
    'http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-2016.csv',
    'http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-2017.csv',
    'http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-2018.csv',
    'http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-2019.csv',
]
for url in urls:
    spark.sparkContext.addFile(url)

[None, None, None, None, None, None]

In [25]:
df1 = spark.read.csv(SparkFiles.get("pp-20*.csv"), header=False, inferSchema= True)

In [26]:
df1.count()

5993157

In [27]:
df1.show()

+--------------------+------+----------------+--------+---+---+---+---+----+-------------------+------------------+---------------+---------------+-------+----+----+
|                 _c0|   _c1|             _c2|     _c3|_c4|_c5|_c6|_c7| _c8|                _c9|              _c10|           _c11|           _c12|   _c13|_c14|_c15|
+--------------------+------+----------------+--------+---+---+---+---+----+-------------------+------------------+---------------+---------------+-------+----+----+
|{7011B109-CFCA-8E...|280000|2018-06-04 00:00| IP4 5ES|  S|  N|  F|  3|null|     RANDWELL CLOSE|              null|        IPSWICH|        IPSWICH|SUFFOLK|   A|   A|
|{7011B109-CFCB-8E...|280000|2018-05-29 00:00| IP1 4BS|  T|  N|  F|261|null|       NORWICH ROAD|              null|        IPSWICH|        IPSWICH|SUFFOLK|   A|   A|
|{7011B109-CFCC-8E...|170000|2018-04-27 00:00| IP4 4BH|  T|  N|  F| 31|null|        PARADE ROAD|              null|        IPSWICH|        IPSWICH|SUFFOLK|   A|   A|
|{70

In [28]:
# Write the data to DeltaLake format
df1.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("data/delta_example")

In [29]:
# Read the data back from disk
df2 = spark.read.format("delta").load("data/delta_example")

In [32]:
%time df2.count()

CPU times: user 0 ns, sys: 43.9 ms, total: 43.9 ms
Wall time: 612 ms


5993157

In [31]:
df2.show()

+--------------------+------+----------------+--------+---+---+---+--------------------+-------+--------------------+-----------------+-------------------+--------------------+-------------+----+----+
|                 _c0|   _c1|             _c2|     _c3|_c4|_c5|_c6|                 _c7|    _c8|                 _c9|             _c10|               _c11|                _c12|         _c13|_c14|_c15|
+--------------------+------+----------------+--------+---+---+---+--------------------+-------+--------------------+-----------------+-------------------+--------------------+-------------+----+----+
|{8A4719CC-30C8-4B...|170000|2015-07-24 00:00| NE5 5LX|  S|  N|  L|                  78|   null|        LANGDON ROAD|             null|NEWCASTLE UPON TYNE| NEWCASTLE UPON TYNE|TYNE AND WEAR|   A|   A|
|{7B66C3FB-0307-43...|125000|2015-07-09 00:00|CV12 0JG|  S|  N|  F|                 128|   null|           DARK LANE|             null|           BEDWORTH|NUNEATON AND BEDW...| WARWICKSHIRE|   A| 

In [33]:
df2.createOrReplaceTempView("table1")

In [34]:
df3 = spark.sql("SELECT _c1 AS f1, _c3 as f2 from table1")

In [35]:
df3.show()

+------+--------+
|    f1|      f2|
+------+--------+
|170000| NE5 5LX|
|125000|CV12 0JG|
|260000|BN14 0EW|
|815000|KT10 0ND|
|200000|DT11 7NX|
|230500|BH15 3NN|
|139950| BH2 5ST|
|145000| BH4 8ED|
|284950|BH12 3DR|
|465000|BH18 9HU|
|340000| SP8 5DH|
|336000|BH13 7QF|
|245000| BH6 5LU|
|275000|BH20 4QX|
|255000|DT11 9NQ|
|229950| BH6 3PX|
|141000| BH1 4AB|
|396000|DT11 7NY|
|660000| DT1 1QR|
|252000| SP4 6FG|
+------+--------+
only showing top 20 rows



In [41]:
spark.sql("SELECT max(_c2) from table1 ").show()

+----------------+
|        max(_c2)|
+----------------+
|2019-12-26 00:00|
+----------------+



In [40]:
spark.sql("SELECT min(_c2) from table1 ").show()

+----------------+
|        min(_c2)|
+----------------+
|2014-01-01 00:00|
+----------------+



In [39]:
df2.describe().show()

+-------+--------------------+-----------------+----------------+--------+-------+-------+-------+--------------------+--------------------+------------+-----------+--------------+-------+--------------------+-------+-------+
|summary|                 _c0|              _c1|             _c2|     _c3|    _c4|    _c5|    _c6|                 _c7|                 _c8|         _c9|       _c10|          _c11|   _c12|                _c13|   _c14|   _c15|
+-------+--------------------+-----------------+----------------+--------+-------+-------+-------+--------------------+--------------------+------------+-----------+--------------+-------+--------------------+-------+-------+
|  count|             5993157|          5993157|         5993157| 5970195|5993157|5993157|5993157|             5993157|              831218|     5890512|    2209511|       5993157|5992971|             5993157|5993157|5993157|
|   mean|                null|322975.6216006022|            null|    null|   null|   null|   nul

In [43]:
df2.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)



In [62]:
from pyspark.sql.functions import col, unix_timestamp, to_timestamp
df4 = df2.withColumn('transaction_timestamp', to_timestamp(unix_timestamp(col('_c2'), 'yyyy-MM-dd HH:mm').cast("timestamp")))

In [63]:
df4.show()

+--------------------+------+----------------+--------+---+---+---+--------------------+-------+--------------------+-----------------+-------------------+--------------------+-------------+----+----+---------------------+
|                 _c0|   _c1|             _c2|     _c3|_c4|_c5|_c6|                 _c7|    _c8|                 _c9|             _c10|               _c11|                _c12|         _c13|_c14|_c15|transaction_timestamp|
+--------------------+------+----------------+--------+---+---+---+--------------------+-------+--------------------+-----------------+-------------------+--------------------+-------------+----+----+---------------------+
|{8A4719CC-30C8-4B...|170000|2015-07-24 00:00| NE5 5LX|  S|  N|  L|                  78|   null|        LANGDON ROAD|             null|NEWCASTLE UPON TYNE| NEWCASTLE UPON TYNE|TYNE AND WEAR|   A|   A|  2015-07-24 00:00:00|
|{7B66C3FB-0307-43...|125000|2015-07-09 00:00|CV12 0JG|  S|  N|  F|                 128|   null|           D

In [64]:
df4.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- transaction_timestamp: timestamp (nullable = true)



In [65]:
df2.plot()

AttributeError: 'DataFrame' object has no attribute 'plot'