##  Archiving Processed Files

Let us go through the details about reading and writing data to target location leveraging Spark Structured Streaming APIs while dealing with processed files.

* As part of data pipelines, we typically delete the files or archive the files which are already processed.
* As part of the archival process, typically we move the files to a different location using some low cost storage.
* Spark Structured Streaming APIs support both `delete` as well as `archive` using `cleanSource` option.
* By default `cleanSource` is turned off. We can set it to either `delete` or `archive`.
* When we use `archive`, we also need to set the archiving folder using `sourceArchiveDir`. We need to pass the fully qualified path of the archive folder to `sourceArchiveDir`.

In [1]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config('spark.sql.warehouse.dir', f'/user/{username}/warehouse'). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Incremental Loads using Spark Structured Streaming'). \
    master('yarn'). \
    getOrCreate()

In [2]:
spark.conf.set('spark.sql.shuffle.partitions', '8')
spark.conf.set('spark.sql.streaming.schemaInference', 'true')

In [3]:
ghactivity_df = spark. \
    readStream. \
    format('json'). \
    option('maxFilesPerTrigger', 8). \
    option('cleanSource', 'archive'). \
    option('sourceArchiveDir', f'/user/{username}/github/streaming/landing/archive/ghactivity/'). \
    load(f'/user/{username}/github/streaming/landing/ghactivity/')

In [4]:
from pyspark.sql.functions import year, month, dayofmonth, lpad

In [5]:
ghactivity_df = ghactivity_df. \
    withColumn('created_year', year('created_at')). \
    withColumn('created_month', lpad(month('created_at'), 2, '0')). \
    withColumn('created_dayofmonth', lpad(dayofmonth('created_at'), 2, '0'))

In [6]:
ghactivity_df. \
    writeStream. \
    partitionBy('created_year', 'created_month', 'created_dayofmonth'). \
    format('parquet'). \
    option("checkpointLocation", f"/user/{username}/github/streaming/bronze/checkpoint/ghactivity"). \
    option("path", f"/user/{username}/github/streaming/bronze/data/ghactivity"). \
    trigger(processingTime='120 seconds'). \
    start()

# If the job run is completed before 120 seconds, it will wait up to 120 seconds for the next run.
# If the job run takes more than 120 seconds to complete, then next run will start immediately.

<pyspark.sql.streaming.StreamingQuery at 0x7fa3d8e804e0>

* Validating the checkpoint location. We can see multiple folders. These folders will have all the files that are required for the overhead of the checkpoint.

In [7]:
!hdfs dfs -ls /user/${USER}/github/streaming/bronze/checkpoint/ghactivity

Found 4 items
drwxr-xr-x   - itv007304 supergroup          0 2023-07-29 00:22 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/commits
-rw-r--r--   3 itv007304 supergroup         45 2023-07-14 12:17 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/metadata
drwxr-xr-x   - itv007304 supergroup          0 2023-08-02 15:02 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/offsets
drwxr-xr-x   - itv007304 supergroup          0 2023-07-14 12:17 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/sources


In [15]:
!hdfs dfs -ls -R /user/${USER}/github/streaming/bronze/checkpoint/ghactivity/sources

drwxr-xr-x   - itv007304 supergroup          0 2023-08-02 15:06 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/sources/0
-rw-r--r--   3 itv007304 supergroup       4432 2023-07-14 12:17 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/sources/0/0
-rw-r--r--   3 itv007304 supergroup       4432 2023-07-25 10:27 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/sources/0/1
-rw-r--r--   3 itv007304 supergroup       1482 2023-07-29 00:15 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/sources/0/2
-rw-r--r--   3 itv007304 supergroup       1480 2023-07-29 00:18 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/sources/0/3
-rw-r--r--   3 itv007304 supergroup       1474 2023-07-29 00:20 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/sources/0/4
-rw-r--r--   3 itv007304 supergroup       1474 2023-08-02 15:02 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/sources/0/5
-rw-r--r--   3 itv007304 supergroup 

In [10]:
!hdfs dfs -cat /user/${USER}/github/streaming/bronze/checkpoint/ghactivity/sources/0/5

v1
{"path":"hdfs://m01.itversity.com:9000/user/itv007304/github/streaming/landing/ghactivity/year=2023/month=07/dayofmonth=14/2023-07-14-0.json.gz","timestamp":1691001538280,"batchId":5}
{"path":"hdfs://m01.itversity.com:9000/user/itv007304/github/streaming/landing/ghactivity/year=2023/month=07/dayofmonth=14/2023-07-14-1.json.gz","timestamp":1691001548303,"batchId":5}
{"path":"hdfs://m01.itversity.com:9000/user/itv007304/github/streaming/landing/ghactivity/year=2023/month=07/dayofmonth=14/2023-07-14-2.json.gz","timestamp":1691001558519,"batchId":5}
{"path":"hdfs://m01.itversity.com:9000/user/itv007304/github/streaming/landing/ghactivity/year=2023/month=07/dayofmonth=14/2023-07-14-3.json.gz","timestamp":1691001567978,"batchId":5}
{"path":"hdfs://m01.itversity.com:9000/user/itv007304/github/streaming/landing/ghactivity/year=2023/month=07/dayofmonth=14/2023-07-14-4.json.gz","timestamp":1691001576935,"batchId":5}
{"path":"hdfs://m01.itversity.com:9000/user/itv007304/github/streaming/landin

In [12]:
!hdfs dfs -cat /user/${USER}/github/streaming/bronze/checkpoint/ghactivity/sources/0/6

v1
{"path":"hdfs://m01.itversity.com:9000/user/itv007304/github/streaming/landing/ghactivity/year=2023/month=07/dayofmonth=14/2023-07-14-8.json.gz","timestamp":1691001628861,"batchId":6}
{"path":"hdfs://m01.itversity.com:9000/user/itv007304/github/streaming/landing/ghactivity/year=2023/month=07/dayofmonth=14/2023-07-14-9.json.gz","timestamp":1691001642875,"batchId":6}
{"path":"hdfs://m01.itversity.com:9000/user/itv007304/github/streaming/landing/ghactivity/year=2023/month=07/dayofmonth=14/2023-07-14-10.json.gz","timestamp":1691001655772,"batchId":6}
{"path":"hdfs://m01.itversity.com:9000/user/itv007304/github/streaming/landing/ghactivity/year=2023/month=07/dayofmonth=14/2023-07-14-11.json.gz","timestamp":1691001667237,"batchId":6}
{"path":"hdfs://m01.itversity.com:9000/user/itv007304/github/streaming/landing/ghactivity/year=2023/month=07/dayofmonth=14/2023-07-14-12.json.gz","timestamp":1691001679969,"batchId":6}
{"path":"hdfs://m01.itversity.com:9000/user/itv007304/github/streaming/lan

In [16]:
!hdfs dfs -ls /user/${USER}/github/streaming/bronze/checkpoint/ghactivity/offsets

Found 8 items
-rw-r--r--   3 itv007304 supergroup        471 2023-07-14 12:17 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/offsets/0
-rw-r--r--   3 itv007304 supergroup        471 2023-07-25 10:27 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/offsets/1
-rw-r--r--   3 itv007304 supergroup        471 2023-07-29 00:15 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/offsets/2
-rw-r--r--   3 itv007304 supergroup        471 2023-07-29 00:18 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/offsets/3
-rw-r--r--   3 itv007304 supergroup        471 2023-07-29 00:20 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/offsets/4
-rw-r--r--   3 itv007304 supergroup        471 2023-08-02 15:02 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/offsets/5
-rw-r--r--   3 itv007304 supergroup        471 2023-08-02 15:04 /user/itv007304/github/streaming/bronze/checkpoint/ghactivity/offsets/6
-rw-r--r--   3 itv007304 supergrou

In [17]:
!hdfs dfs -cat /user/${USER}/github/streaming/bronze/checkpoint/ghactivity/offsets/5

v1
{"batchWatermarkMs":0,"batchTimestampMs":1691002921669,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"16"}}
{"logOffset":5}

In [None]:
!hdfs dfs -cat /user/${USER}/github/streaming/bronze/checkpoint/ghactivity/offsets/3

* Validating the data location. We should see the files in this location as we are just copying the files in the parquet file format.

In [18]:
!hdfs dfs -ls /user/${USER}/github/streaming/bronze/data/ghactivity/created_year=2023/created_month=07

Found 4 items
drwxr-xr-x   - itv007304 supergroup          0 2023-07-14 12:22 /user/itv007304/github/streaming/bronze/data/ghactivity/created_year=2023/created_month=07/created_dayofmonth=11
drwxr-xr-x   - itv007304 supergroup          0 2023-07-25 10:32 /user/itv007304/github/streaming/bronze/data/ghactivity/created_year=2023/created_month=07/created_dayofmonth=12
drwxr-xr-x   - itv007304 supergroup          0 2023-07-29 00:20 /user/itv007304/github/streaming/bronze/data/ghactivity/created_year=2023/created_month=07/created_dayofmonth=13
drwxr-xr-x   - itv007304 supergroup          0 2023-08-02 15:06 /user/itv007304/github/streaming/bronze/data/ghactivity/created_year=2023/created_month=07/created_dayofmonth=14


In [19]:
!hdfs dfs -ls /user/${USER}/github/streaming/bronze/data/ghactivity/created_year=2023/created_month=07/created_dayofmonth=14

Found 23 items
-rw-r--r--   3 itv007304 supergroup  202134557 2023-08-02 15:07 /user/itv007304/github/streaming/bronze/data/ghactivity/created_year=2023/created_month=07/created_dayofmonth=14/part-00000-5c5fdbec-b281-4b88-b076-334a3b1b430d.c000.snappy.parquet
-rw-r--r--   3 itv007304 supergroup  219936891 2023-08-02 15:05 /user/itv007304/github/streaming/bronze/data/ghactivity/created_year=2023/created_month=07/created_dayofmonth=14/part-00000-69fdd9ea-d5d3-467b-8261-590dd2fd3a16.c000.snappy.parquet
-rw-r--r--   3 itv007304 supergroup  184306281 2023-08-02 15:03 /user/itv007304/github/streaming/bronze/data/ghactivity/created_year=2023/created_month=07/created_dayofmonth=14/part-00000-800f016c-e02d-46e0-833b-d94c631b6805.c000.snappy.parquet
-rw-r--r--   3 itv007304 supergroup  164860809 2023-08-02 15:03 /user/itv007304/github/streaming/bronze/data/ghactivity/created_year=2023/created_month=07/created_dayofmonth=14/part-00001-31149cef-aa29-4b7d-94ec-da9e1a557bd7.c000.snappy.parquet
-rw-r

In [None]:
!hdfs dfs -ls -R /user/${USER}/github/streaming/bronze/data/ghactivity

* Validating the archive folder

In [21]:
!hdfs dfs -ls -R /user/${USER}/github/streaming/landing/archive/ghactivity

drwxr-xr-x   - itv007304 supergroup          0 2023-08-02 15:02 /user/itv007304/github/streaming/landing/archive/ghactivity/user
drwxr-xr-x   - itv007304 supergroup          0 2023-08-02 15:02 /user/itv007304/github/streaming/landing/archive/ghactivity/user/itv007304
drwxr-xr-x   - itv007304 supergroup          0 2023-08-02 15:02 /user/itv007304/github/streaming/landing/archive/ghactivity/user/itv007304/github
drwxr-xr-x   - itv007304 supergroup          0 2023-08-02 15:02 /user/itv007304/github/streaming/landing/archive/ghactivity/user/itv007304/github/streaming
drwxr-xr-x   - itv007304 supergroup          0 2023-08-02 15:02 /user/itv007304/github/streaming/landing/archive/ghactivity/user/itv007304/github/streaming/landing
drwxr-xr-x   - itv007304 supergroup          0 2023-08-02 15:02 /user/itv007304/github/streaming/landing/archive/ghactivity/user/itv007304/github/streaming/landing/ghactivity
drwxr-xr-x   - itv007304 supergroup          0 2023-08-02 15:02 /user/itv007304/github/stre