In [7]:
import os
import kaggle
from pyspark.sql import SparkSession
from io import StringIO
import pandas as pd
from azure.storage.filedatalake import DataLakeServiceClient
from loguru import logger


def fetch_kaggle_dataset_as_dataframe(dataset_name, file_name):
    """
    Fetch a specified Kaggle dataset file and return it as a Pandas DataFrame.

    Args:
    - dataset_name (str): The identifier for the dataset in format "USERNAME/DATASET".
    - file_name (str): The specific file within the dataset.

    Returns:
    - pd.DataFrame: DataFrame containing the dataset's data.
    """

    # Create a temporary directory for the Kaggle dataset
    download_dir = "./temp_kaggle_download"
    if not os.path.exists(download_dir):
        os.makedirs(download_dir)
        logger.info(f"Created directory: {download_dir}")

    # Authenticate and download the dataset
    logger.info(f"Authenticating and downloading dataset: {dataset_name}")
    kaggle.api.authenticate()
    kaggle.api.dataset_download_files(dataset_name, path=download_dir, unzip=True)

    # Path to the desired file within the dataset
    file_path = os.path.join(download_dir, file_name)

    if not os.path.exists(file_path):
        logger.error(f"File {file_name} not found in downloaded dataset.")
        return None

    # Load the file into a Pandas DataFrame
    logger.info(f"Reading file {file_name} into DataFrame.")
    df = pd.read_csv(file_path)

    # Clean up (delete the temporary dataset directory)
    logger.info("Cleaning up temporary files.")
    os.remove(file_path)
    os.rmdir(download_dir)

    return df

In [8]:
def group_and_count_ordered(df_pandas):
    """
    Receives a Pandas DataFrame, uses Spark to perform row count grouped by 'town',
    orders the result by count, and returns the result as a Pandas DataFrame.

    Args:
    - df_pandas (pd.DataFrame): Input Pandas DataFrame.

    Returns:
    - pd.DataFrame: Resultant DataFrame with counts per town ordered by count.
    """

    logger.info("Initializing Spark session.")
    # Initialize Spark session
    spark = SparkSession.builder.appName("GroupByTownOrdered").getOrCreate()

    # Convert Pandas DataFrame to Spark DataFrame
    logger.info("Converting Pandas DataFrame to Spark DataFrame.")
    df_spark = spark.createDataFrame(df_pandas)

    # Group by 'town', count rows, and order by count
    logger.info("Grouping by 'town', counting rows, and ordering by count.")
    grouped_df_spark = (
        df_spark.groupBy("town").count().orderBy("count", ascending=False)
    )

    # Convert the result back to Pandas DataFrame
    logger.info("Converting the result back to Pandas DataFrame.")
    result_df_pandas = grouped_df_spark.toPandas()

    # Stop the Spark session
    logger.info("Stopping the Spark session.")
    spark.stop()

    return result_df_pandas

In [9]:

def write_df_to_adls2(account_name, account_key, file_system_name, file_path, df):
    """
    Write a pandas DataFrame to Azure Data Lake Storage Gen2 as a CSV file.

    Parameters:
    - account_name (str): Azure storage account name
    - account_key (str): Azure storage account key
    - file_system_name (str): Name of the file system (container) in ADLS Gen2
    - file_path (str): Path of the file inside the file system, including its name
    - df (pd.DataFrame): DataFrame to be written
    """
    logger.info(
        f"Writing DataFrame to Azure Data Lake Storage Gen2 at {file_path} in {file_system_name}."
    )

    # Create a Data Lake service client using account name and key
    logger.debug("Creating Data Lake service client.")
    service_client = DataLakeServiceClient(
        account_url=f"https://{account_name}.dfs.core.windows.net",
        credential=account_key,
    )

    # Get the file system client for the specified file system
    logger.debug(f"Getting file system client for: {file_system_name}.")
    file_system_client = service_client.get_file_system_client(file_system_name)

    # Get the data lake file client for the specified file path
    logger.debug(f"Getting file client for: {file_path}.")
    file_client = file_system_client.get_file_client(file_path)

    # Convert the dataframe to CSV format and get the content in bytes
    logger.debug("Converting DataFrame to CSV format.")
    csv_content = StringIO()
    df.to_csv(csv_content, index=False)
    csv_bytes = csv_content.getvalue().encode("utf-8")

    # Upload the content to the file
    logger.info("Uploading CSV content to Azure Data Lake Storage Gen2.")
    file_client.upload_data(csv_bytes, overwrite=True)
    logger.success(f"Successfully uploaded DataFrame to {file_path}.")
# Example Usage
# df = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
# write_df_to_adls2('YOUR_ACCOUNT_NAME', 'YOUR_ACCOUNT_KEY', 'YOUR_FILE_SYSTEM_NAME', 'path/to/yourfile.csv', df)


In [10]:
# Example Usage: Fetch a specified Kaggle dataset file and return it as a Pandas DataFrame.
dataset_id = "anoopjohny/real-estate-sales-2001-2020-state-of-connecticut"  # Use your desired dataset's identifier
file_in_dataset = "Real_Estate_Sales_2001-2020_GL.csv"  # Use the specific file name you want within the dataset
dataframe = fetch_kaggle_dataset_as_dataframe(dataset_id, file_in_dataset)
print("Number of lines present:-",  
      len(dataframe)) 

2023-10-22 17:45:01.838 | INFO     | __main__:fetch_kaggle_dataset_as_dataframe:26 - Created directory: ./temp_kaggle_download
2023-10-22 17:45:01.840 | INFO     | __main__:fetch_kaggle_dataset_as_dataframe:29 - Authenticating and downloading dataset: anoopjohny/real-estate-sales-2001-2020-state-of-connecticut
2023-10-22 17:45:09.310 | INFO     | __main__:fetch_kaggle_dataset_as_dataframe:41 - Reading file Real_Estate_Sales_2001-2020_GL.csv into DataFrame.
  df = pd.read_csv(file_path)
2023-10-22 17:45:11.073 | INFO     | __main__:fetch_kaggle_dataset_as_dataframe:45 - Cleaning up temporary files.


Number of lines present:- 997213


In [11]:
# Example Usage: Receives a Pandas DataFrame, uses Spark to perform row count grouped by 'town', orders the result by count, and returns the result as a Pandas DataFrame.
df = pd.DataFrame(dataframe)
result = group_and_count_ordered(df)
print(result.head(20))

2023-10-22 17:45:11.288 | INFO     | __main__:group_and_count_ordered:13 - Initializing Spark session.
2023-10-22 17:45:11.486 | INFO     | __main__:group_and_count_ordered:18 - Converting Pandas DataFrame to Spark DataFrame.
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
2023-10-22 17:45:43.259 | INFO     | __main__:group_and_count_ordered:22 - Grouping by 'town', counting rows, and ordering by count.
2023-10-22 17:45:43.291 | INFO     | __main__:group_and_count_ordered:28 - Converting the result back to Pandas DataFrame.
23/10/22 17:45:43 WARN TaskSetManager: Stage 0 contains a task of very large size (7088 KiB). The maximum recommended task size is 1000 KiB.
2023-10-22 17:45:46.526 | INFO     | __main__:group_and_count_ordered:32 - Stopping the Spark session.


             town  count
0      Bridgeport  34201
1        Stamford  32529
2       Waterbury  28506
3         Norwalk  23960
4       New Haven  21346
5         Danbury  20350
6   West Hartford  19854
7        Hartford  18810
8         Milford  17749
9         Meriden  17502
10      Greenwich  17390
11        Bristol  16915
12      Stratford  16688
13    New Britain  16405
14     Manchester  16380
15         Hamden  16192
16      Fairfield  15898
17  East Hartford  13732
18     Torrington  13172
19     Middletown  12403


In [12]:
# Example usage: Saves a Pandas DataFrame to Azure Data Lake Storage Gen2.
write_df_to_adls2(
    account_name="montrealadls",
    account_key="dWksQ33gDM56isvYdBv0U/lrOSwK5QQPfLRKCKJagYBhc0pR4UIb2GpPj+tvMT6oFUX24J/fi8lv+AStybQh1g==",
    file_system_name="montrealfilesystem",
    file_path="stest.csv",
    df=df
)

2023-10-22 17:45:47.535 | INFO     | __main__:write_df_to_adls2:12 - Writing DataFrame to Azure Data Lake Storage Gen2 at stest.csv in montrealfilesystem.
2023-10-22 17:45:47.537 | DEBUG    | __main__:write_df_to_adls2:17 - Creating Data Lake service client.
2023-10-22 17:45:47.538 | DEBUG    | __main__:write_df_to_adls2:24 - Getting file system client for: montrealfilesystem.
2023-10-22 17:45:47.540 | DEBUG    | __main__:write_df_to_adls2:28 - Getting file client for: stest.csv.
2023-10-22 17:45:47.541 | DEBUG    | __main__:write_df_to_adls2:32 - Converting DataFrame to CSV format.
2023-10-22 17:45:50.645 | INFO     | __main__:write_df_to_adls2:38 - Uploading CSV content to Azure Data Lake Storage Gen2.
2023-10-22 17:46:05.982 | SUCCESS  | __main__:write_df_to_adls2:40 - Successfully uploaded DataFrame to stest.csv.
