In [1]:
import pyspark
import pyspark
from pyspark.sql import SparkSession
import os
import time

## DEFINE SENSITIVE VARIABLES
NESSIE_URI = "http://nessie:19120/api/v1"
MINIO_ACCESS_KEY = "admin"
MINIO_SECRET_KEY = "password"

conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
  		#packages
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.3_2.12:0.67.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
  		#SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
  		#Configuring Catalog
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', NESSIE_URI)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set('spark.sql.catalog.nessie.warehouse', 's3a://warehouse')
        .set('spark.sql.catalog.nessie.s3.endpoint', 'http://minio:9000')
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
  		#MINIO CREDENTIALS
        .set('spark.hadoop.fs.s3a.access.key', MINIO_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', MINIO_SECRET_KEY)
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")


:: loading settings :: url = jar:file:/home/docker/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/docker/.ivy2/cache
The jars for the packages stored in: /home/docker/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.3_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.3_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-667e253e-9c4c-4f8d-a65e-f2c9cd6e5ce7;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.3_2.12;1.3.1 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.3_2.12;0.67.0 in central
	found software.amazon.awssdk#bundle;2.17.178 in central
	found software.amazon.eventstream#eventstream;1.0.1 in central
	found software.amazon.awssdk#url-connection-client;2.17.178 in central
	found software.amazon.awssdk#utils;2.17.178 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central

24/12/18 15:12:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Spark Running


### Create Schema and Temp View for the CSV File

In [2]:
spark.sql("CREATE DATABASE IF NOT EXISTS nessie.towers_schema;")

DataFrame[]

In [3]:
from pyspark.sql.types import *

# Define schema
schema = StructType([
    StructField("_c0", IntegerType(), True),
    StructField("radio", StringType(), True),
    StructField("MCC", IntegerType(), True),
    StructField("MNC", IntegerType(), True),
    StructField("TAC", IntegerType(), True),
    StructField("CID", LongType(), True),  # BigInt
    StructField("unit", IntegerType(), True),
    StructField("LON", DoubleType(), True),
    StructField("LAT", DoubleType(), True),
    StructField("RANGE", IntegerType(), True),
    StructField("SAM", IntegerType(), True),
    StructField("changeable", IntegerType(), True),
    StructField("created", LongType(), True),
    StructField("updated", LongType(), True),
    StructField("averageSignal", IntegerType(), True),
    StructField("Country", StringType(), True),
    StructField("Network", StringType(), True),
    StructField("Continent", StringType(), True),
])

# Read CSV with defined schema
csv_df = spark.read.format("csv").option("header", "true").schema(schema).load("../datasets/Asia Towers.csv")
csv_df.createOrReplaceTempView("csv_asia_towers")


### CREATE AN ICEBERG TABLE FROM THE SQL VIEW

spark.sql("CREATE TABLE IF NOT EXISTS nessie.towers_schema.asia_towers USING iceberg AS SELECT * FROM csv_asia_towers").show()

In [4]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS nessie.towers_schema.asia_towers (
        _c0 INT,
        radio STRING,
        MCC INT,
        MNC INT,
        TAC INT,
        CID BIGINT,
        unit INT,
        LON DOUBLE,
        LAT DOUBLE,
        RANGE INT,
        SAM INT,
        changeable INT,
        created BIGINT,
        updated BIGINT,
        averageSignal INT,
        Country STRING,
        Network STRING,
        Continent STRING
    ) 
    USING iceberg 
""").show()


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


++
||
++
++



In [5]:
# Insert data from the temporary view into the Iceberg table
spark.sql("""
    INSERT INTO nessie.towers_schema.asia_towers 
    SELECT 
        _c0,
        radio,
        MCC,
        MNC,
        TAC,
        CID,
        unit,
        LON,
        LAT,
        RANGE,
        SAM,
        changeable,
        created,
        updated,
        averageSignal,
        Country,
        Network,
        Continent
    FROM csv_asia_towers
""").show()


[Stage 0:>                                                         (0 + 4) / 12]

24/12/18 15:12:56 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , radio, MCC, MNC, TAC, CID, unit, LON, LAT, RANGE, SAM, changeable, created, updated, averageSignal, Country, Network, Continent
 Schema: _c0, radio, MCC, MNC, TAC, CID, unit, LON, LAT, RANGE, SAM, changeable, created, updated, averageSignal, Country, Network, Continent
Expected: _c0 but found: 
CSV file: file:///home/docker/datasets/Asia%20Towers.csv


                                                                                

++
||
++
++



In [6]:
## QUERY THE ICEBERG TABLE
spark.sql("SELECT * FROM nessie.towers_schema.asia_towers limit 10").show()

                                                                                

+--------+-----+---+---+---+---------+----+--------------+---------------+-----+---+----------+----------+----------+-------------+---------+-------+---------+
|     _c0|radio|MCC|MNC|TAC|      CID|unit|           LON|            LAT|RANGE|SAM|changeable|   created|   updated|averageSignal|  Country|Network|Continent|
+--------+-----+---+---+---+---------+----+--------------+---------------+-----+---+----------+----------+----------+-------------+---------+-------+---------+
|12142742|  GSM|525|  1| 63|      452|   0|    103.827896|       1.431656| 1000|  3|         1|1459692344|1487379337|            0|Singapore|SingTel|     Asia|
|12142743| UMTS|525|  1|315| 20666852|   0|    103.625793|       1.309433| 1000|  2|         1|1370464837|1370464837|            0|Singapore|SingTel|     Asia|
|12142744|  GSM|525|  1| 63|     5143|   0|    103.838882|       1.425247| 1000|  3|         1|1380831294|1461549037|            0|Singapore|SingTel|     Asia|
|12142745|  GSM|525|  1| 63|     5142|  

In [7]:
mid_query = '''SELECT
        Country,
        Network,
        AVG(averageSignal) AS AvgSignal
    FROM nessie.towers_schema.asia_towers
    GROUP BY Country, Network;'''

start_time = time.time()
spark.sql(mid_query).show()
end_time = time.time()

execution_time = end_time - start_time
print(f"Execution Time: {execution_time:.2f} seconds")



+------------+--------------+---------+
|     Country|       Network|AvgSignal|
+------------+--------------+---------+
|  East Timor| Timor Telecom|      0.0|
|        Oman|Omantel Mobile|      0.0|
|      Taiwan| Taiwan Mobile|      0.0|
|        Iran|           MCI|      0.0|
|   Indonesia|     Telkomsel|      0.0|
|      Russia|           MTS|      0.0|
|      Russia|         Tele2|      0.0|
|   Indonesia|            XL|      0.0|
|  Bangladesh|        Airtel|      0.0|
|  Bangladesh|    Banglalink|      0.0|
|  Kazakhstan|         Kcell|      0.0|
|       China|  China Unicom|      0.0|
|   Singapore|       SingTel|      0.0|
|      Russia|       Megafon|      0.0|
|       China|  China Mobile|      0.0|
| Philippines|         Smart|      0.0|
|Saudi Arabia|STC / Al Jawal|      0.0|
|      Russia|       Beeline|      0.0|
|    Malaysia|        Celcom|      0.0|
|  Bangladesh|  GrameenPhone|      0.0|
+------------+--------------+---------+
only showing top 20 rows

Execution Time

                                                                                

In [8]:
query = '''WITH AverageSignal AS (
    SELECT
        Country,
        Network,
        AVG(averageSignal) AS AvgSignal
    FROM nessie.towers_schema.asia_towers
    GROUP BY Country, Network
),
UniqueUnits AS (
    SELECT
        Country,
        Network,
        COUNT(DISTINCT unit) AS UniqueUnitCount
    FROM nessie.towers_schema.asia_towers
    GROUP BY Country, Network
),
RangeStats AS (
    SELECT
        Country,
        Network,
        MAX(RANGE) AS MaxRange,
        MIN(RANGE) AS MinRange,
        AVG(RANGE) AS AvgRange
    FROM nessie.towers_schema.asia_towers
    GROUP BY Country, Network
)

SELECT
    a.Country,
    a.Network,
    a.AvgSignal,
    u.UniqueUnitCount,
    r.MaxRange,
    r.MinRange,
    r.AvgRange
FROM AverageSignal a
JOIN UniqueUnits u ON a.Country = u.Country AND a.Network = u.Network
JOIN RangeStats r ON a.Country = r.Country AND a.Network = r.Network
ORDER BY a.Country, a.Network;'''

In [9]:
start_time = time.time()
spark.sql(query).show()
end_time = time.time()

execution_time = end_time - start_time
print(f"Execution Time: {execution_time:.2f} seconds")

                                                                                

+-----------+------------+---------+---------------+--------+--------+------------------+
|    Country|     Network|AvgSignal|UniqueUnitCount|MaxRange|MinRange|          AvgRange|
+-----------+------------+---------+---------------+--------+--------+------------------+
|   Abkhazia|    A-Mobile|      0.0|              3|   93410|     962|1645.9551020408164|
|   Abkhazia|     Aquafon|      0.0|             72|   28607|    1000|1365.2775800711743|
|Afghanistan|        AWCC|      0.0|              8|   30106|     604|3147.8259242957747|
|Afghanistan|    Etisalat|      0.0|              7|    9575|     611| 1045.543431750107|
|Afghanistan|         MTN|      0.0|              2|   11542|     368| 1125.530731707317|
|Afghanistan|    Mobifone|      0.0|             43|   12434|    1000|            1598.5|
|Afghanistan|      Roshan|      0.0|              2|   15803|     500|1560.7526981190256|
|    Bahrain|     Batelco|      0.0|              2|   21567|    1000|1416.4400908574673|
|    Bahra

### CREATE PARTITIONED TABLE FROM THE SQL VIEW

spark.sql("""
    CREATE TABLE IF NOT EXISTS nessie.towers_schema.asia_towers_partitioned
    USING iceberg 
    PARTITIONED BY (Country, Network) 
    AS 
    SELECT * 
    FROM csv_asia_towers
    ORDER BY Country, Network
""").show()

In [10]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS nessie.towers_schema.asia_towers_partitioned (
        _c0 INT,
        radio STRING,
        MCC INT,
        MNC INT,
        TAC INT,
        CID BIGINT,
        unit INT,
        LON DOUBLE,
        LAT DOUBLE,
        RANGE INT,
        SAM INT,
        changeable INT,
        created BIGINT,
        updated BIGINT,
        averageSignal INT,
        Country STRING,
        Network STRING,
        Continent STRING
    ) 
    USING iceberg 
    PARTITIONED BY (Country, Network) 
""").show()


++
||
++
++



In [11]:
spark.sql("""
    INSERT INTO nessie.towers_schema.asia_towers_partitioned
    SELECT 
        _c0,
        radio,
        MCC,
        MNC,
        TAC,
        CID,
        unit,
        LON,
        LAT,
        RANGE,
        SAM,
        changeable,
        created,
        updated,
        averageSignal,
        Country,
        Network,
        Continent
    FROM csv_asia_towers
""").show()


24/12/18 15:14:25 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , radio, MCC, MNC, TAC, CID, unit, LON, LAT, RANGE, SAM, changeable, created, updated, averageSignal, Country, Network, Continent
 Schema: _c0, radio, MCC, MNC, TAC, CID, unit, LON, LAT, RANGE, SAM, changeable, created, updated, averageSignal, Country, Network, Continent
Expected: _c0 but found: 
CSV file: file:///home/docker/datasets/Asia%20Towers.csv


                                                                                

++
||
++
++



In [12]:
mid_query = '''SELECT
        Country,
        Network,
        AVG(averageSignal) AS AvgSignal
    FROM nessie.towers_schema.asia_towers_partitioned
    GROUP BY Country, Network;'''

start_time = time.time()
spark.sql(mid_query).show()
end_time = time.time()

execution_time = end_time - start_time
print(f"Execution Time: {execution_time:.2f} seconds")



+---------+----------------+---------+
|  Country|         Network|AvgSignal|
+---------+----------------+---------+
|   Russia|      SEVTELECOM|      0.0|
|Singapore|     TPG Telecom|      0.0|
|   Israel|        YouPhone|      0.0|
|     Iran|        Telekish|      0.0|
|   Jordan|          Orange|      0.0|
|   Russia|           Motiv|      0.0|
|   Israel|      Hot Mobile|      0.0|
| Cambodia|        Cellcard|      0.0|
|   Russia|         Skylink|      0.0|
|    Japan|            KDDI|      0.0|
|   Russia|     Krymtelecom|      0.0|
|   Taiwan|Chunghwa Telecom|      0.0|
|Indonesia|       Telkomsel|      0.0|
|    Macao|   China Telecom|      0.0|
| Mongolia|          Unitel|      0.0|
|     Iran|          Taliya|      0.0|
|Singapore|              M1|      0.0|
|Hong Kong|SUN Mobile / CSL|      0.0|
|    Nepal|           Smart|      0.0|
|    China|   China Telecom|      0.0|
+---------+----------------+---------+
only showing top 20 rows

Execution Time: 6.13 seconds


                                                                                

In [13]:
query_on_partitioned_table = '''WITH AverageSignal AS (
    SELECT
        Country,
        Network,
        AVG(averageSignal) AS AvgSignal
    FROM nessie.towers_schema.asia_towers_partitioned
    GROUP BY Country, Network
),
UniqueUnits AS (
    SELECT
        Country,
        Network,
        COUNT(DISTINCT unit) AS UniqueUnitCount
    FROM nessie.towers_schema.asia_towers_partitioned
    GROUP BY Country, Network
),
RangeStats AS (
    SELECT
        Country,
        Network,
        MAX(RANGE) AS MaxRange,
        MIN(RANGE) AS MinRange,
        AVG(RANGE) AS AvgRange
    FROM nessie.towers_schema.asia_towers_partitioned
    GROUP BY Country, Network
)

SELECT
    a.Country,
    a.Network,
    a.AvgSignal,
    u.UniqueUnitCount,
    r.MaxRange,
    r.MinRange,
    r.AvgRange
FROM AverageSignal a
JOIN UniqueUnits u ON a.Country = u.Country AND a.Network = u.Network
JOIN RangeStats r ON a.Country = r.Country AND a.Network = r.Network
ORDER BY a.Country, a.Network;'''

In [14]:
start_time = time.time()
spark.sql(query_on_partitioned_table).show()
end_time = time.time()

execution_time = end_time - start_time
print(f"Execution Time: {execution_time:.2f} seconds")



+-----------+------------+---------+---------------+--------+--------+------------------+
|    Country|     Network|AvgSignal|UniqueUnitCount|MaxRange|MinRange|          AvgRange|
+-----------+------------+---------+---------------+--------+--------+------------------+
|   Abkhazia|    A-Mobile|      0.0|              3|   93410|     962|1645.9551020408164|
|   Abkhazia|     Aquafon|      0.0|             72|   28607|    1000|1365.2775800711743|
|Afghanistan|        AWCC|      0.0|              8|   30106|     604|3147.8259242957747|
|Afghanistan|    Etisalat|      0.0|              7|    9575|     611| 1045.543431750107|
|Afghanistan|         MTN|      0.0|              2|   11542|     368| 1125.530731707317|
|Afghanistan|    Mobifone|      0.0|             43|   12434|    1000|            1598.5|
|Afghanistan|      Roshan|      0.0|              2|   15803|     500|1560.7526981190256|
|    Bahrain|     Batelco|      0.0|              2|   21567|    1000|1416.4400908574673|
|    Bahra

                                                                                