This notebook shows how to use the ShodanDatasetManager to convert Shodan files into a single Delta format, to enrich and manage this dataset. 

In [1]:
import os                                                                            
from pyspark.sql import SparkSession
from tlhop.shodan_dataset_manager import ShodanDatasetManager

TMP_PORT = os.environ["SPARK_UI_PORT"]
SPARK_TMP_PATH = os.environ["SPARK_TMP_PATH"]

In [None]:
spark = SparkSession.builder\
            .master("local[10]")\
            .config("spark.driver.memory", "40g")\
            .config("spark.local.dir", SPARK_TMP_PATH)\
            .config("spark.ui.port", TMP_PORT)\
            .getOrCreate()

In [3]:
FOLDER_INPUT = "XXXXX"
INPUT_FILES = [FOLDER_INPUT+f"BR.202307{day}.json.bz2" for day in ["10", "11"]]

TMP_OUTPUT = os.path.expanduser("~/shodan-dataset.delta")
TMP_OUTPUT_LOG = os.path.expanduser("~/conversion.log")

In [4]:
shodan_mgr = ShodanDatasetManager(output_folder=TMP_OUTPUT, output_log=TMP_OUTPUT_LOG)

In [None]:
shodan_mgr.convert_files(INPUT_FILES, org_refinement=True, fix_brazilian_cities=True)

After convert all files, users can access the dataset using Spark native API:

In [None]:
df = spark.read.format("delta").load(TMP_OUTPUT)
df.count()

### Dataset optimization

Spark may generate small files over time. Because of that, we expose a method (`optimize_delta`) to optimize the dataset by merging small files into a bigger size.

In [None]:
shodan_mgr.optimize_delta(TMP_OUTPUT)

Delta format supports time travel. In order to support this feature, older files version are kept inside dataset folder (for instance, it keeps the version before the execution of `optimize_delta` method). When we ensure that older dataset versions are not needed anymore, we can use the `remove_old_delta_versions` method to force a removal of these old versions.

### Cleaning old versions

In [None]:
shodan_mgr.remove_old_delta_versions(TMP_OUTPUT)

### Further Delta operations

Because we use Delta, further operations are also available using native Delta API. For instance, we can check the complete dataset history:

In [None]:
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, TMP_OUTPUT)
deltaTable.history().toPandas()

In [11]:
spark.stop()