# All Imports

In [8]:
import csv
import pandas as pd
import re
import unicodedata
import os
import logging
import datetime as dt

# Declare All Functions

In [9]:
# Setup logging
logging.basicConfig(filename='processing_log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# prompt: Create a function to remove duplicate records based on specified columns. Add the duplicates records to separate dataframe and drop them from the original. Include error checking and logging.

def remove_duplicate_records(df, columns):
    """
    Removes duplicate records based on specified columns.
    Adds duplicate records to a separate dataframe and drops them from the original.

    Args:
        df (pd.DataFrame): The dataframe to process.
        columns (list): A list of column names to consider for duplicate detection.

    Returns:
        tuple: A tuple containing the updated dataframe with unique records
               and a new dataframe with duplicate records.
    """
    try:
        print(f"Processing dataframe to remove and store duplicate records.")
        logging.info(f"Processing dataframe to remove and store duplicate records.")
        duplicate_df = pd.DataFrame()
        df_deduplicated = df.drop_duplicates(subset=columns, keep='first')
        duplicate_rows = df[~df.index.isin(df_deduplicated.index)]

        if not duplicate_rows.empty:
            duplicate_df = pd.concat([duplicate_df, duplicate_rows], ignore_index=True)

        print(f"Duplicate removal complete. Duplicate records appended to duplicate_df.")
        logging.info(f"Duplicate removal complete. Duplicate records appended to duplicate_df.")
        return df_deduplicated, duplicate_df

    except Exception as e:
        print(f"Error occurred during duplicate removal: {e}")
        logging.error(f"Error occurred during duplicate removal: {e}")
        return df, pd.DataFrame()

# prompt: Create a function to process a specified CSV file and then run the function to remove duplicates and convert the valid and duplicates dataframes to csv files.

def process_duplicates_csv(file_path, output_valid_csv, output_duplicates_csv, columns, sep=','):
    """
    Processes a single CSV file, removes duplicates, and outputs valid and duplicate dataframes to CSV files.
    """
    try:
        df = pd.read_csv(file_path, sep=sep, low_memory=True)
        print(f"Processing CSV file: {file_path} for duplicates.")
        logging.info(f"Processing CSV file: {file_path} for duplicates.")

        # Remove duplicates based on email
        df, duplicates_df = remove_duplicate_records(df, columns)

        # Save valid and duplicate dataframes to CSV files
        df.to_csv(output_valid_csv, index=False)
        duplicates_df.to_csv(output_duplicates_csv, index=False)

        print(f"Processed file: {file_path}. Valid data saved to {output_valid_csv}, duplicates to {output_duplicates_csv}.")
        logging.info(f"Processed file: {file_path}. Valid data saved to {output_valid_csv}, duplicates to {output_duplicates_csv}.")

    except Exception as e:
        print(f"Error processing file {file_path}: {e}")
        logging.error(f"Error processing file {file_path}: {e}")


# prompt: Create a function to split a large csv into chunks in a specified folder or path using the chunksize parameter in read_csv

def split_csv_into_chunks(file_path, chunksize, output_directory, sep=','):
  """Splits a large CSV file into smaller chunks using the chunksize parameter.

  Args:
    file_path: The path to the large CSV file.
    chunksize: The number of rows per chunk.
    output_directory: The directory where the chunks should be saved.
  """
  try:
    print(f"Processing CSV file: {file_path} to split into chunks.")
    logging.info(f"Processing CSV file: {file_path} to split into chunks.")

    if not os.path.exists(output_directory):
      os.makedirs(output_directory)

    for i, chunk in enumerate(pd.read_csv(file_path, chunksize=chunksize, sep=sep)):
      output_file = os.path.join(output_directory, f"chunk_{i+1}.csv")
      chunk.to_csv(output_file, index=False)

    print(f"File '{file_path}' split into {i+1} chunks in '{output_directory}'.")
    logging(f"File '{file_path}' split into {i+1} chunks in '{output_directory}'.")

  except FileNotFoundError as e:
    print(f"Error: {e}")
    logging.error(f"Error: {e}")
  except Exception as e:
    print(f"An unexpected error occurred when splitting CSV into chunks: {e}")
    logging.error(f"An unexpected error occurred when splitting CSV into chunks: {e}")

# prompt: Create a function to get chunked CSVs from a specified folder, runs the validation functions and outputs the cleaned chunks in specified folders. Include error checking and logging.

def process_chunked_csvs_output_folders(input_folder, output_valid_folder, output_error_folder, email_column_name='mail_address', date_columns=['created_at']):
  """
  Processes chunked CSV files from a specified folder, runs validation functions,
  and outputs the cleaned chunks in specified folders. Includes error checking and logging.

  Args:
      input_folder (str): The path to the folder containing chunked CSV files.
      output_valid_folder (str): The path to the folder to output cleaned chunks.
      output_error_folder (str): The path to the folder to output chunks with errors.
      email_column_name (str): The name of the email column.
      date_columns (list): A list of column names to consider for date validation.
  """

  try:
    # Setup logging
    #logging.basicConfig(filename='processing_log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    print(f"Processing chunks from: {input_folder} for data cleaning and output to folders.")
    logging.info(f"Processing chunks from: {input_folder} for data cleaning and output to folders.")

    for filename in os.listdir(input_folder):
      if filename.endswith(".csv"):
        file_path = os.path.join(input_folder, filename)
        print(f"Processing file: {file_path}")
        logging.info(f"Processing file: {file_path}")

        try:
          df = pd.read_csv(file_path, low_memory=True)

          # Run validation functions
          #df['gender'] = df['gender'].replace({0.0: 'M', 1.0: 'F'})
          df, chunk_error_df = validate_and_remove_invalid_emails(df, email_column_name)
          df = remove_time_from_date(df, date_columns)

          # Output cleaned chunk
          if not os.path.exists(output_valid_folder):
            os.makedirs(output_valid_folder)

          output_valid_file = os.path.join(output_valid_folder, f"valid_{filename}")
          df.to_csv(output_valid_file, index=False)
          print(f"Final valid data saved to {output_valid_file}.")
          logging.info(f"Final valid data saved to {output_valid_file}.")

          # Output chunk with errors
          if not os.path.exists(output_error_folder):
            os.makedirs(output_error_folder)

          output_error_file = os.path.join(output_error_folder, f"error_{filename}")
          chunk_error_df.to_csv(output_error_file, index=False)
          print(f"Final error data saved to {output_error_file}.")
          logging.info(f"Final error data saved to {output_error_file}.")

          print(f"File from {file_path} processed successfully.")
          logging.info(f"File {file_path} processed successfully.")

        except Exception as e:
          print(f"Error processing file {file_path}: {e}")
          logging.error(f"Error processing file {file_path}: {e}")

  except Exception as e:
    print(f"Critical error during processing: {e}")
    logging.critical(f"Critical error during processing: {e}")

# prompt: Create a function to remove the time from a date in specified columns

def remove_time_from_date(df, columns):
  """Removes the time component from date columns in a DataFrame.

  Args:
    df: The DataFrame containing the date columns.
    columns: A list of column names to process.

  Returns:
    The DataFrame with the time component removed from the specified columns.
  """
  try:
    print(f"Removing time from specified date columns: {columns}")
    logging.info(f"Removing time from specified date columns: {columns}")
    for column in columns:
      if column in df.columns:
        # Convert to datetime if not already
        df[column] = pd.to_datetime(df[column], errors='coerce')
        # Remove the time component
        df[column] = df[column].dt.date
    print("Date cleaning complete. Time removed from date columns.")
    logging.info("Date cleaning complete. Time removed from date columns.")
    return df
  except Exception as e:
    print(f"An error occurred during removing time from date function: {e}")
    logging.error(f"An error occurred during removing time from date function: {e}")
    return df


# prompt: create a function to validate emails addresses in a dataframe, append the records with an invalid email address to a dataframe and drop them from the original dataframe. Return both the updated dataframe and the error dataframe. Include error checking and loggging.

def validate_and_remove_invalid_emails(df, email_column):
    """
    Validates email addresses in a dataframe, appends records with invalid email
    addresses to a new dataframe, and removes them from the original dataframe.

    Args:
        df (pd.DataFrame): The dataframe containing email addresses.
        email_column (str): The name of the column containing email addresses.

    Returns:
        tuple: A tuple containing the updated dataframe with valid email addresses
            and a new dataframe with records containing invalid email addresses.
    """
    try:
        print(f"Removing invalid emails from {email_column}.")
        logging.info(f"Removing invalid emails from {email_column}.")

        # Regular expression for basic email validation
        email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"

        # Create a new dataframe to store records with invalid email addresses
        error_df = pd.DataFrame()

        # Iterate through the dataframe and validate email addresses
        for index, row in df.iterrows():
            email = row[email_column]
            if not re.match(email_regex, email):
                # Append record to the error dataframe
                error_df = pd.concat([error_df, pd.DataFrame([row])], ignore_index=True)
                # Drop the record from the original dataframe
                df.drop(index, inplace=True)

        print("Validation complete. Invalid email records appended to error_df.")
        logging.info("Validation complete. Invalid email records appended to error_df.")
        return df, error_df

    except Exception as e:
        print(f"Error occurred during email validation: {e}")
        logging.error(f"Error occurred during email validation: {e}")
        return df, pd.DataFrame()  # Return empty error dataframe in case of error


# prompt: Create a function to get chunked csvs from a specified folder, runs the validation functions and merges the chunks into a specified final valid csv file and final error csv file. INclude error checking and logging.

def process_chunked_csvs(input_folder, output_valid_csv, output_error_csv, email_column_name='mail_address', date_columns=['created_at']):
  """
  Processes chunked CSV files from a specified folder, runs validation functions,
  and merges the results into final valid and error CSV files.

  Args:
      input_folder (str): The path to the folder containing chunked CSV files.
      output_valid_csv (str): The path to the output CSV file for valid records.
      output_error_csv (str): The path to the output CSV file for error records.
      output_duplicates_csv (str): The path to the output CSV file for duplicate records.
      email_column_name (str): The name of the email column.
      date_columns (list): A list of column names to consider for date validation.
  """

  try:
    # Setup logging
    #logging.basicConfig(filename='processing_log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

    valid_df = pd.DataFrame()
    error_df = pd.DataFrame()

    print(f"Processing chunks for data cleaning & combining from {input_folder} to single cleaned file: {output_valid_csv}")
    logging.info(f"Processing chunks for data cleaning & combining from {input_folder} to single cleaned file: {output_valid_csv}")

    for filename in os.listdir(input_folder):
      if filename.endswith(".csv"):
        file_path = os.path.join(input_folder, filename)
        print(f"Processing chunk: {file_path}")
        logging.info(f"Processing chunk: {file_path}")

        try:
          df = pd.read_csv(file_path, low_memory=True)

          # Run validation functions
          #df['gender'] = df['gender'].replace({0.0: 'M', 1.0: 'F'})
          df, chunk_error_df = validate_and_remove_invalid_emails(df, email_column_name)
          df = remove_time_from_date(df, date_columns)
          #df['gender'] = df['gender'].astype(int)
          #df['birthday_on'] = df['birthday_on'].dt.date
          # df = truncate_large_fields(df)
          # df = remove_non_utf_characters(df)

          valid_df = pd.concat([valid_df, df], ignore_index=True)

          # Concatenate error dataframes
          #chunk_error_df = pd.concat([chunk_error_df,chunk_dup_error_df], ignore_index=True)
          error_df = pd.concat([error_df, chunk_error_df], ignore_index=True)

          print(f"File {file_path} processed successfully.")
          logging.info(f"File {file_path} processed successfully.")

        except Exception as e:
          print(f"Error processing file {file_path}: {e}")
          logging.error(f"Error processing file {file_path}: {e}")

    # Remove duplicates from full dataframe
    #valid_df, duplicates_df = remove_duplicate_records(valid_df, ['mail_address']) #remove duplicates based on email
    #error_df = pd.concat([error_df, chunk_dup_error_df], ignore_index=True)

    # Save final dataframes
    valid_df.to_csv(output_valid_csv, index=False)
    error_df.to_csv(output_error_csv, index=False)
    #duplicates_df.to_csv(output_duplicates_csv, index=False)

    print(f"Final cleaned data saved to {output_valid_csv}.")
    logging.info(f"Final cleaned data saved to {output_valid_csv}.")
    print(f"Final garbage data saved to {output_error_csv}.")
    logging.info(f"Final garbage data saved to {output_error_csv}.")
    #logging.info(f"Final duplicates data saved to {output_duplicates_csv}.")

  except Exception as e:
    print(f"Critical error during processing of chunks: {e}")
    logging.critical(f"Critical error during processing of chunks: {e}")

# prompt: Create a function that will combine csv chunks from a specified folder into one csv file

def combine_csv_chunks(input_folder, output_file):
  """
  Combines multiple CSV chunks from a folder into a single CSV file.

  Args:
    input_folder: The path to the folder containing CSV chunks.
    output_file: The path to the output CSV file.
  """

  combined_df = pd.DataFrame()
  print(f"Combining CSV chunks from {input_folder}")
  logging.info(f"Combining CSV chunks from {input_folder}")

  for filename in os.listdir(input_folder):
    if filename.endswith(".csv"):
      file_path = os.path.join(input_folder, filename)
      try:
        df = pd.read_csv(file_path)
        combined_df = pd.concat([combined_df, df], ignore_index=True)
      except Exception as e:
        print(f"Error reading file {file_path}: {e}")
        logging.error(f"Error reading file {file_path}: {e}")

  combined_df.to_csv(output_file, index=False)
  print(f"Combined CSV chunks saved to {output_file}")
  logging.info(f"Combined CSV chunks saved to {output_file}")


# **Optional Step: Function to remove invalid rows from a CSV**

In [None]:
# prompt: Create a function that removes invalid rows from a CSV. Detect the number of columns based on the header and the specified delimiter.
# Rows where the number of columns don't match the expected number of columns and may cause problems when trying to use the read_csv function.
# Create a new CSV file with the valid rows.

def remove_invalid_rows(input_file, output_file, delimiter=','):
  """Removes invalid rows from a CSV file.

  Args:
    input_file: Path to the input CSV file.
    output_file: Path to the output CSV file.
    delimiter: Delimiter used in the CSV file.
  """

  with open(input_file, 'r', encoding='utf-8') as infile, \
       open(output_file, 'w', newline='', encoding='utf-8') as outfile:

    reader = csv.reader(infile, delimiter=delimiter)
    writer = csv.writer(outfile, delimiter=delimiter)

    header = next(reader)  # Read the header row
    expected_num_columns = len(header)
    writer.writerow(header)  # Write the header to the output file

    for row in reader:
      if len(row) == expected_num_columns:
        writer.writerow(row)

# prompt: Create a function that removes invalid rows from a CSV. Detect the number of columns based on the header and the specified delimiter.
# Rows where the number of columns don't match the expected number of columns and may cause problems when trying to use the read_csv function.

def remove_invalid_rows_from_csv(csv_file_path, delimiter=','):
  """Removes rows from a CSV file that have an invalid number of columns.

  Args:
    csv_file_path: The path to the CSV file.
    delimiter: The delimiter used in the CSV file.

  Returns:
    A list of valid rows.
  """

  with open(csv_file_path, 'r', encoding='utf-8') as file:
    reader = csv.reader(file, delimiter=delimiter)
    header = next(reader)  # Get the header row
    expected_num_columns = len(header)
    valid_rows = [header]  # Start with the header

    for row in reader:
      if len(row) == expected_num_columns:
        valid_rows.append(row)
      else:
        print(f"Warning: Skipping row with invalid number of columns: {row}")

  return valid_rows

# Example usage:
# valid_rows = remove_invalid_rows_from_csv('my_file.csv')



# **Step 1: Function to run: check for duplicates, then use the valid CSV to create the chunks for further processing.**

In [3]:
# Check for Duplicates in the Original CSV:
process_duplicates_csv('/content/lifebear.csv', 'valid_data.csv', 'duplicate_data.csv', sep=';', columns=['login_id', 'mail_address'])


  df = pd.read_csv(file_path, sep=sep, low_memory=True)


Duplicate removal complete. Duplicate records appended to duplicate_df.
Processed file: /content/lifebear.csv. Valid data saved to valid_data.csv, duplicates to duplicate_data.csv.


# ** Step 2: Function to split chunks based on the chunksize**

In [4]:
# Split the large CSV into chunks for further processing:
# Replace 'your_large_file.csv' with the actual path to your file
# Replace 'output_chunks_folder' with the desired output directory
#split_csv_into_chunks('/content/lifebear.csv', 1000000, '/content/chunks', sep=';')
split_csv_into_chunks('/content/valid_data.csv', 1000000, '/content/chunks', sep=',')


  for i, chunk in enumerate(pd.read_csv(file_path, chunksize=chunksize, sep=sep)):


File '/content/valid_data.csv' split into 4 chunks in '/content/chunks'.


# *Step 3: Run data cleaning functions and export chunks to specified output folders*

In [5]:
# Run data cleaning functions and export chunks to specified output folders:
process_chunked_csvs_output_folders('/content/chunks', '/content/cleaned_chunks', '/content/error_chunks')


Validation complete. Invalid email records appended to error_df.
Validation complete. Invalid email records appended to error_df.


  df = pd.read_csv(file_path, low_memory=True)


Validation complete. Invalid email records appended to error_df.
Validation complete. Invalid email records appended to error_df.


# *Step 3 (Alternate): Run data cleaning functions and combine chunks to specified CSVs*

In [6]:
# Get chunked csvs from a specified folder, runs the validation functions and merges the chunks into a specified final valid csv file and final error csv file. INclude error checking and logging.
# Run data cleaning functions and combine chunks to specified CSVs
process_chunked_csvs('/content/chunks', 'final_valid_data.csv', 'final_error_data.csv')

Validation complete. Invalid email records appended to error_df.
Validation complete. Invalid email records appended to error_df.


  df = pd.read_csv(file_path, low_memory=True)


Validation complete. Invalid email records appended to error_df.
Validation complete. Invalid email records appended to error_df.


# Step 4 (Optional): Function to Combine Chunks into a Single CSV

In [10]:
# Combine chunks into a single CSV:
# Replace 'your_chunks_folder' and 'combined_file.csv' with your actual paths
combine_csv_chunks('/content/cleaned_chunks', 'combined_cleaned_data.csv')


Combining CSV chunks from /content/cleaned_chunks


  df = pd.read_csv(file_path)


Combined CSV chunks saved to combined_cleaned_data.csv


In [None]:
# prompt: Create code to read a csv and show sample of dataframe

# Replace 'your_file.csv' with the actual path to your CSV file
df = pd.read_csv('/content/final_valid_data.csv')

# Show a sample of the dataframe (e.g., the first 5 rows)
print(df.head())


  df = pd.read_csv('/content/final_valid_data.csv')


   id    login_id              mail_address                          password  \
0   1    sugimoto   sugimoto@lifebear.co.jp  f0bac04aa1b45cf443d722d6f71c0250   
1   2         kou  nakanishi@lifebear.co.jp  48207c322ee5bb156ffec9f08c960aaa   
2   3      yusuke     yuozawa1208@gmail.com  048261a8024ce51d379eb53cc51aaf33   
3   4  entyan1106        endo1106@gmail.com  cd77a9dac26260a104facda5665eb3ab   
4   5      kuriki          kuriki@wavy4.com  a026597c294cc48cd20ae361f10cbab1   

   created_at          salt birthday_on  gender  
0  2012-01-13  yGwBKynnsctI  1984-11-09     0.0  
1  2012-01-14  aha6EuRYCDvU  1986-11-13     0.0  
2  2012-01-17  PVS59dPWk9BH  1984-12-08     0.0  
3  2012-01-17  vLZI6TVCJowN  1987-11-06     0.0  
4  2012-01-17  swFznWWk79fg  1986-10-21     0.0  


In [None]:
# prompt: Generate code to show all duplicate mail_address from lifebear.csv

import pandas as pd

# Replace 'lifebear.csv' with the actual path to your CSV file
df = pd.read_csv('/content/lifebear.csv', sep=";", low_memory=True)

# Check if 'mail_address' exists in the DataFrame
if 'mail_address' in df.columns:
    # Find duplicate mail_address entries
    duplicate_emails = df[df.duplicated(subset=['mail_address'], keep=False)]  # keep=False shows all duplicates
    # Print the duplicate email addresses
    print(duplicate_emails)
else:
    print("The 'mail_address' column does not exist in the DataFrame.")

duplicate_emails.to_csv('duplicate_emails.csv', index=False)

  df = pd.read_csv('/content/lifebear.csv', sep=";", low_memory=True)


               id         login_id                    mail_address  \
136           138        maaam1120             ammma1120@gmail.com   
221           223         exuernok            2br02b1215@gmail.com   
227           229     takayuki0930          takayuki0930@gmail.com   
332           334           hiromi       hiromi.sono.111@gmail.com   
621           623          UZUMAME       uzumame.uzumame@gmail.com   
...           ...              ...                             ...   
3679894  11593957          0enaka0           dara0o0arad@gmail.com   
3679926  11594071         ns109097             ns109097@icloud.com   
3679964  11594208      takayamasae        as_coco_0520@yahoo.co.jp   
3680042  11594511          hyx0630             hyx0630@ezweb.ne.jp   
3680353  11595638  relapishoptest9  relapishoptest9@lifebear.co.jp   

                                 password           created_at          salt  \
136      f2dea97eab78a50d6cc615c2f172c890  2012-05-28 11:58:21  48dpW9JT7u1w   