In [None]:
%run ~/skatt-naering/src/settings.py
%run ~/skatt-naering/production/naeringsspesifikasjon/config_naeringsspesifikasjon.py

In [None]:
import time

In [None]:
import dask.dataframe as dd
import gcsfs
import pandas as pd
import pyarrow.parquet as pq
import pyspark.pandas as ps
from dapla import FileClient
from dapla.auth import AuthClient
from pyspark.sql import SparkSession

In [None]:
from nst import functions

# Read in Data using pyarrow

In [None]:
# Define the paths


fs = FileClient.get_gcs_file_system()

files1 = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_bredt/*"
    )
    if f.endswith(".parquet")
]
files2 = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_langt/*"
    )
    if f.endswith(".parquet")
]

# We will store total time taken for each dataset in these variables
total_time_bredt = 0
total_time_langt = 0

iterations = 10

for _ in range(iterations):
    # 'bredt' dataset
    start_time = time.time()
    #     # Filter out only the parquet files

    tabley = pq.read_table(files1, filesystem=fs)
    # df = tabley.to_pandas()

    total_time_bredt += time.time() - start_time

    #     # 'langt' dataset
    start_time = time.time()
    #     # Filter out only the parquet files

    tabley = pq.read_table(files2, filesystem=fs)
    # df = tabley.to_pandas()

    total_time_langt += time.time() - start_time

# Compute the average time for each dataset
avg_time_bredt = total_time_bredt / iterations
avg_time_langt = total_time_langt / iterations

print(
    f"Average time taken to read 'bredt' dataset using pyarrow: {avg_time_bredt:.2f} seconds"
)
print(
    f"Average time taken to read 'langt' dataset using pyarrow: {avg_time_langt:.2f} seconds"
)

# Read in data using Pyarrow with DASK

In [None]:
from dapla import FileClient


fs = FileClient.get_gcs_file_system()

# Define the paths
bredt_path = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_bredt/*"
    )
    if f.endswith(".parquet")
]
langt_path = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_langt/*"
    )
    if f.endswith(".parquet")
]

# We will store total time taken for each dataset in these variables
total_time_bredt = 0
total_time_langt = 0

iterations = 10

for _ in range(iterations):
    # 'bredt' dataset
    start_time = time.time()
    bredt_df = dd.read_parquet(bredt_path, engine="pyarrow", filesystem=fs)
    # Trigger computation with .compute() if you want to bring data into memory
    # bredt_df.compute()
    total_time_bredt += time.time() - start_time

    # 'langt' dataset
    start_time = time.time()
    langt_df = dd.read_parquet(langt_path, engine="pyarrow", filesystem=fs)
    # Trigger computation with .compute() if you want to bring data into memory
    # langt_df.compute()
    total_time_langt += time.time() - start_time

# Compute the average time for each dataset
avg_time_bredt = total_time_bredt / iterations
avg_time_langt = total_time_langt / iterations

print(
    f"Average time taken to read 'bredt' dataset using Dask with pyarrow engine: {avg_time_bredt:.2f} seconds"
)
print(
    f"Average time taken to read 'langt' dataset using Dask with pyarrow engine: {avg_time_langt:.2f} seconds"
)

# Read in / create dataframe for data using Pyarrow

In [None]:
import time

In [None]:
import pandas as pd
import pyarrow.parquet as pq

In [None]:
from dapla import FileClient


fs = FileClient.get_gcs_file_system()

# Define the paths
files1 = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_bredt/*"
    )
    if f.endswith(".parquet")
]
files2 = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_langt/*"
    )
    if f.endswith(".parquet")
]

# We will store total time taken for each dataset in these variables
total_time_bredt = 0
total_time_langt = 0

iterations = 3

for _ in range(iterations):
    # 'bredt' dataset
    start_time = time.time()
    tabley = pq.read_table(files1, filesystem=fs)
    df_bredt = tabley.to_pandas()  # Create a Pandas DataFrame
    total_time_bredt += time.time() - start_time

    # # 'langt' dataset
    # start_time = time.time()
    # tabley = pq.read_table(files2, filesystem=fs)
    # df_langt = tabley.to_pandas()  # Create a Pandas DataFrame
    # total_time_langt += time.time() - start_time

# Compute the average time for each dataset
avg_time_bredt = total_time_bredt / iterations
# avg_time_langt = total_time_langt / iterations

print(
    f"Average time taken to read and create DataFrame for 'bredt' dataset using pyarrow: {avg_time_bredt:.2f} seconds"
)


# Read in / create dataframe for data using DASK with pyarrow

In [None]:
import time

In [None]:
import dask.dataframe as dd

In [None]:
from dapla import FileClient


fs = FileClient.get_gcs_file_system()

# Define the paths
bredt_path = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_bredt/*"
    )
    if f.endswith(".parquet")
]
langt_path = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_langt/*"
    )
    if f.endswith(".parquet")
]

# We will store total time taken for each dataset in these variables
total_time_bredt = 0
total_time_langt = 0

iterations = 2

for _ in range(iterations):
    # 'bredt' dataset
    start_time = time.time()
    bredt_ddf = dd.read_parquet(bredt_path, engine="pyarrow", filesystem=fs)
    bredt_ddf = (
        bredt_ddf.compute()
    )  # Bring data into memory and create a Dask DataFrame
    total_time_bredt += time.time() - start_time

    # 'langt' dataset
    start_time = time.time()
    langt_ddf = dd.read_parquet(langt_path, engine="pyarrow", filesystem=fs)
    langt_ddf = (
        langt_ddf.compute()
    )  # Bring data into memory and create a Dask DataFrame
    total_time_langt += time.time() - start_time

# Compute the average time for each dataset
avg_time_bredt = total_time_bredt / iterations
avg_time_langt = total_time_langt / iterations

print(
    f"Average time taken to read and create DataFrame for 'bredt' dataset using Dask with pyarrow engine: {avg_time_bredt:.2f} seconds"
)
print(
    f"Average time taken to read and create DataFrame for 'langt' dataset using Dask with pyarrow engine: {avg_time_langt:.2f} seconds"
)

# Pyarrow with DuckDB

In [None]:
import time

In [None]:
import duckdb
import pandas as pd
import pyarrow.parquet as pq

In [None]:
from dapla import FileClient


fs = FileClient.get_gcs_file_system()

# Define the paths
files1 = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_bredt/*"
    )
    if f.endswith(".parquet")
]
files2 = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_langt/*"
    )
    if f.endswith(".parquet")
]

# We will store total time taken for each dataset in these variables
total_time_bredt = 0
total_time_langt = 0

con = duckdb.connect()

iterations = 3

for _ in range(iterations):
    # 'bredt' dataset
    start_time = time.time()
    tabley = pq.read_table(files1, filesystem=fs)
    df_bredt = con.execute("Select * from tabley").df()
    total_time_bredt += time.time() - start_time

    # 'langt' dataset
    start_time = time.time()
    tabley = pq.read_table(files2, filesystem=fs)
    df_langt = con.execute("Select * from tabley").df()
    total_time_langt += time.time() - start_time

# Compute the average time for each dataset
avg_time_bredt = total_time_bredt / iterations
avg_time_langt = total_time_langt / iterations

print(
    f"Average time taken to read and create DataFrame for 'bredt' dataset using pyarrow: {avg_time_bredt:.2f} seconds"
)
print(
    f"Average time taken to read and create DataFrame for 'langt' dataset using pyarrow: {avg_time_langt:.2f} seconds"
)
# print(f"Average time taken to read and create DataFrame for 'langt' dataset using pyarrow: Failure. Not enough memory")

# Partition

In [None]:
import dask.dataframe as dd

In [None]:
from dapla import FileClient


fs = FileClient.get_gcs_file_system()

# Partition the 'langt' dataset
partitioned_path = (
    f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/partitioned_langt_data"
)
langt_df.to_parquet(
    partitioned_path,
    partition_on=["hovedtema", "undertema", "gruppe"],
    engine="pyarrow",
    filesystem=fs,
)

# Dask queries

In [None]:
from dapla import FileClient


fs = FileClient.get_gcs_file_system()

# Define the paths
bredt_path = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_bredt/*"
    )
    if f.endswith(".parquet")
]
langt_path = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_langt/*"
    )
    if f.endswith(".parquet")
]
partitioned_path_langt = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/partitioned_langt_data/hovedtema=resultatregnskap/undertema=driftsinntekt/**/*"
    )
    if f.endswith(".parquet")
]

partitioned_path_bredt = [
    "ssb-prod-skatt-naering-data-produkt/temp/naeringsspesifikasjon_data/g2022/resultregnskap_balanseregnskap_testfiler/partitioned_wide_data/resultatregnskap/driftsinntekt/salgsinntekt/salgsinntekt",
    "ssb-prod-skatt-naering-data-produkt/temp/naeringsspesifikasjon_data/g2022/resultregnskap_balanseregnskap_testfiler/partitioned_wide_data/resultatregnskap/driftsinntekt/annenDriftsinntekt/annenDriftsinntekt",
]


# We will store total time taken for each dataset in these variables
total_time_bredt = 0
total_time_langt = 0
total_time_partitioned_langt = 0
total_time_partitioned_bredt = 0

iterations = 5

for _ in range(iterations):
    # 'bredt' dataset
    start_time = time.time()

    bredt_df = dd.read_parquet(
        bredt_path,
        engine="pyarrow",
        filters=[("norskIdentifikator", "==", "00002047889")],
        filesystem=fs,
    )

    # filtered_bredt_df = bredt_df[bredt_df['NorskIdentifaktor'] == '00002047889']
    avg_p3_columns = (
        bredt_df[[col for col in bredt_df.columns if col.startswith("p3")]]
        .sum()
        .compute()
    )

    total_time_bredt += time.time() - start_time

    # 'langt' dataset
    start_time = time.time()

    # langt_df = dd.read_parquet(langt_path, engine='pyarrow', filesystem = fs)

    # Read only rows with the given norskIdentifikator
    langt_df = dd.read_parquet(
        langt_path,
        engine="pyarrow",
        filters=[("norskIdentifikator", "==", "00002047889")],
        filesystem=fs,
    )

    # Now filter the rows based on the 'felt_id' column
    langt_df = langt_df[langt_df["felt_id"].str.startswith("p3")]

    # Convert the 'felt_verdi' column to numeric
    langt_df["felt_verdi"] = langt_df["felt_verdi"].map_partitions(
        pd.to_numeric, errors="coerce"
    )

    # query

    avg_p3_value = langt_df["felt_verdi"].sum().compute()

    total_time_langt += time.time() - start_time

    # 'partitioned' dataset
    start_time = time.time()

    partitioned_df = dd.read_parquet(
        partitioned_path_langt,
        engine="pyarrow",
        filters=[("norskIdentifikator", "==", "00002047889")],
        filesystem=fs,
    )

    # Now filter the rows based on the 'felt_id' column
    partitioned_df = partitioned_df[partitioned_df["felt_id"].str.startswith("p3")]

    # Convert the 'felt_verdi' column to numeric
    partitioned_df["felt_verdi"] = partitioned_df["felt_verdi"].map_partitions(
        pd.to_numeric, errors="coerce"
    )

    avg_p3_value = partitioned_df["felt_verdi"].sum().compute()

    total_time_partitioned_langt += time.time() - start_time

    # 'partitioned' dataset - wide
    start_time = time.time()

    partitioned_df = dd.read_parquet(
        partitioned_path_bredt,
        engine="pyarrow",
        filters=[("norskIdentifikator", "==", "00002047889")],
        filesystem=fs,
    )

    avg_p3_columns = (
        partitioned_df[[col for col in partitioned_df.columns if col.startswith("p3")]]
        .sum()
        .compute()
    )

    total_time_partitioned_bredt += time.time() - start_time


# Compute the average time for each dataset
avg_time_bredt = total_time_bredt / iterations
avg_time_langt = total_time_langt / iterations
avg_time_partitioned_langt = total_time_partitioned_langt / iterations
avg_time_partitioned_bredt = total_time_partitioned_bredt / iterations

print(
    f"Average time taken to compute sum_intekter in the 'bredt' dataset for a single foretak using Dask with pyarrow engine: {avg_time_bredt:.2f} seconds"
)
print(
    f"Average time taken to compute sum_intekter in the 'langt' dataset for a single foretak using Dask with pyarrow engine: {avg_time_langt:.2f} seconds"
)
print(
    f"Average time taken to compute sum_intekter in the 'partitioned_langt' dataset for a single foretak using Dask with pyarrow engine: {avg_time_partitioned_langt:.2f} seconds"
)
print(
    f"Average time taken to compute sum_intekter in the 'partitioned_bredt' dataset for a single foretak using Dask with pyarrow engine: {avg_time_partitioned_bredt:.2f} seconds"
)

# Pyarrow queries

In [None]:
#

# DuckDB queries

In [None]:
import duckdb
from dapla import FileClient


fs = FileClient.get_gcs_file_system()

bredt_path = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_bredt/*"
    )
    if f.endswith(".parquet")
]
langt_path = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_langt/*"
    )
    if f.endswith(".parquet")
]
partitioned_path = [
    f
    for f in fs.glob(
        f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/partitioned_langt_data/hovedtema=resultatregnskap/undertema=driftsinntekt/**/*"
    )
    if f.endswith(".parquet")
]


# We will store total time taken for each dataset in these variables
total_time_bredt = 0
total_time_langt = 0
total_time_partitioned = 0

iterations = 5

# Initialize DuckDB connection
conn = duckdb.connect()

# We will store total time taken for DuckDB in this variable

for _ in range(iterations):
    # 'bredt' dataset
    start_time = time.time()

    # Load the Parquet data into DuckDB
    conn.execute(
        f"CREATE OR REPLACE VIEW bredt_data AS SELECT * FROM parquet_scan('{bredt_path[0]}') WHERE norskIdentifikator = '00002047889'"
    )

    # Calculate average for columns starting with 'p3'
    avg_p3_columns_duckdb = conn.execute(
        "SELECT SUM(column_name) FROM bredt_data WHERE column_name LIKE 'p3%'"
    ).fetchall()[0][0]

    total_time_bredt += time.time() - start_time

    # Repeat similar steps for 'langt' and 'partitioned' dataset if needed

# Compute the average time for DuckDB
avg_time_duckdb = total_time_duckdb / iterations

print(
    f"Average time taken using DuckDB for the 'bredt' dataset: {avg_time_duckdb:.2f} seconds"
)

# Clean up (Optional)
conn.execute("DROP VIEW IF EXISTS bredt_data")
conn.close()

In [None]:
# import shutil
# import os

# import duckdb
# from dapla import FileClient

# fs = FileClient.get_gcs_file_system()

# bredt_path = [
#     f
#     for f in fs.glob(
#         f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_bredt/*"
#     )
#     if f.endswith(".parquet")
# ]
# langt_path = [
#     f
#     for f in fs.glob(
#         f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/resultregnskap_balanseregnskap_testfil_langt/*"
#     )
#     if f.endswith(".parquet")
# ]
# partitioned_path = [
#     f
#     for f in fs.glob(
#         f"{TEMP_PATH}/resultregnskap_balanseregnskap_testfiler/partitioned_langt_data/hovedtema=resultatregnskap/undertema=driftsinntekt/**/*"
#     )
#     if f.endswith(".parquet")
# ]


# # Initialize DuckDB connection
# conn = duckdb.connect()

# # We will store total time taken for DuckDB in this variable
# total_time_duckdb = 0

# # Define a local path to save the downloaded Parquet file
# local_path = "temp_parquet_file.parquet"

# for _ in range(iterations):
#     # Download the file from GCS to the local system
#     with fs.open(bredt_path[0], "rb") as f_remote, open(local_path, "wb") as f_local:
#         shutil.copyfileobj(f_remote, f_local)

#     # 'bredt' dataset
#     start_time = time.time()

#     # Load the locally stored Parquet data into DuckDB
#     conn.execute(
#         f"CREATE OR REPLACE VIEW bredt_data AS SELECT * FROM parquet_scan('{local_path}') WHERE norskIdentifikator = '00002047889'"
#     )

#     # Calculate average for columns starting with 'p3'
#     avg_p3_columns_duckdb = conn.execute(
#         "SELECT SUM(column_name) FROM bredt_data WHERE column_name LIKE 'p3%'"
#     ).fetchall()[0][0]

#     total_time_duckdb += time.time() - start_time

#     # Optionally remove the local file to free up space
#     os.remove(local_path)

#     # Repeat similar steps for 'langt' and 'partitioned' dataset if needed

# # Compute the average time for DuckDB
# avg_time_duckdb = total_time_duckdb / iterations

# print(
#     f"Average time taken using DuckDB for the 'bredt' dataset: {avg_time_duckdb:.2f} seconds"
# )

# # Clean up (Optional)
# conn.execute("DROP VIEW IF EXISTS bredt_data")
# conn.close()