In [11]:
# # Create a Spark session
# import pandas as pd
# from glob import glob
# from pyspark.sql import SparkSession

# spark= SparkSession.builder \
#     .appName("ReadDataPyspark") \
#     .config("spark.executor.memory", "6g") \
#     .config("spark.driver.memory", "6g") \
#     .config("spark.executor.cores", "4") \
#     .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
#     .config("spark.driver.extraJavaOptions", "-XX:+UseG1GC") \
#     .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.7")\
#     .getOrCreate() # .config("spark.sql.shuffle.partitions", "100") \


In [2]:
def read_excel_file(spark,file_path, sheet_name=None, sheet_index=0):
    # Validate input
    if sheet_name is None and sheet_index is None:
        raise ValueError("Either sheet_name or sheet_index must be provided.")
    
    # Determine the dataAddress option based on sheet_name or sheet_index
    if sheet_name:
        data_address = f"'{sheet_name}'!A1"
    else:
        data_address = f"'{sheet_index}'!A1"
    
    # Read the Excel file
    df = spark.read.format("com.crealytics.spark.excel") \
        .option("dataAddress", data_address) \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(file_path)
    
    return df

In [3]:
from glob import glob
from functools import reduce
from pyspark.sql.functions import lit

def read_all_excel_files(spark,directory_path, sheet_name=None, sheet_index=0):
    
    """
    Reads all Excel files in a directory with partitioning for memory efficiency.
    Returns:
      pyspark.sql.DataFrame: A single DataFrame containing data from all Excel files.
    """
    
    # Get all file paths in the directory (assuming .xlsx extension)
    file_paths = glob(f"{directory_path}/*.xlsx")
    if not file_paths:
        raise ValueError(f"No Excel files found in directory {directory_path}.")
    
    # Read each file and create a list of DataFrames
    dfs = [read_excel_file(spark,file_path, sheet_name, sheet_index).withColumn("file_path", lit(file_path)) for file_path in file_paths] #add path
    
    # Merge DataFrames in dfs into a single DataFrame
    merged_df = reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dfs)
    
    return merged_df

In [4]:
def read_csv_file(spark, file_path, delimiter=","):
    df = spark.read.csv(file_path, 
                        header=True, 
                        inferSchema=True,
                        #schema=custom_schema, 
                        sep=delimiter, 
                        mode="DROPMALFORMED")
    return df

In [1]:
def read_csv_large_file(spark, file_path, delimiter=","):
    df = (
    spark.read.format("csv") 
    .option("delimiter", delimiter) 
    .option("header", "true")
    .option("recursiveFileLookup", "true")
    .option("inferSchema", "true") 
    .option("mode", "DROPMALFORMED") 
      #.option("schema", custom_schema) 
    .load(file_path)
    )
    return df

In [6]:
import re

def rename_columns(df):
  """
  Renames columns in a DataFrame with underscores replacing spaces/special characters,
  ensuring only one underscore for consecutive replacements.

  Args:
      df (pyspark.sql.DataFrame): The DataFrame to rename columns in.

  Returns:
      pyspark.sql.DataFrame: The DataFrame with renamed columns.
  """
  for column in df.columns:
    # Use regular expression to replace consecutive spaces, newlines, etc. with single underscore
    #new_column = re.sub(r"\s+|\n+", "_", column)
    new_column = re.sub(r'\s+|\n+|-+', '_', column)
    # Replace other special characters with underscore
    #new_column = new_column.replace("(", "_").replace(")", "_").replace("-", "_")
    #new_column = new_column.replace("-", "_")
    df = df.withColumnRenamed(column, new_column)
  return df


In [4]:
def write_to_sql(spark, spark_df, host_url, tb_name, user, pw):
    """
    Writes a Spark DataFrame to a SQL database.

    Parameters:
        spark (SparkSession): The Spark session object.
        spark_df (DataFrame): The Spark DataFrame to be written to SQL.
        host_url (str): The JDBC URL for the SQL database.
        tb_name (str): The target table name in the SQL database.
        user (str): The username for the SQL database.
        pw (str): The password for the SQL database.
        mode (str): Save mode (default is "overwrite"). Other options include "append", "ignore", "error".
        batch_size (int): The number of records to write in each batch (default is 60,000).
        num_partitions (int, optional): Number of partitions to use for writing. Defaults to the current partition count.

    Returns:
        None
    """
    try:
        # Determine an appropriate number of partitions (adjust based on your dataset size and cluster capacity)
        num_partitions = spark_df.rdd.getNumPartitions()
        
        # Increase batch size
        batch_size = 60000  #
       # Using append mode can sometimes be faster
        spark_df.repartition(num_partitions).write \
            .format("jdbc") \
            .option("url", host_url) \
            .option("dbtable", tb_name) \
            .option("user", user) \
            .option("password", pw) \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .option("batchsize", batch_size) \
            .mode("overwrite") \
            .save()

        print(f"Successfully loaded data into table: {tb_name}")
    
    except ValueError as error:
        print(f"ValueError occurred during SQL write: {error}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")