In [1]:
import mysql.connector
from mysql.connector import Error
import os
import logging
from datetime import datetime
from tqdm import tqdm 

In [2]:
# Configure logging similar to the provided data_ingestion example
logging.basicConfig(filename='weather_data_log.log',  # Log file path
                    filemode='a',  # Append mode
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    level=logging.INFO)

In [3]:
# Database configuration details, Please replace the host, user, password, database name accordingly.
db_config = {
    'host': 'localhost',
    'user': 'root',  
    'password': '',  
    'database': ''  
}

In [4]:
def db_connection(config):
    """A function to establish MySQL database connection."""
    try:
        connection = mysql.connector.connect(**config)
        return connection
    except mysql.connector.Error as error:
        logging.error(f"Error connecting to the database: {error}")
    return None

In [5]:
def files_processing(file_path, connection):
    """Function to process and insert data from a local file into the MySQL database.

    Args:
        file_path (str): The path to the data file.
        connection: Connection object for the MySQL database.

    Returns:
        int: Total number of records inserted into the database.
    """
    total_records = 0
    station_id = os.path.splitext(os.path.basename(file_path))[0]
    with open(file_path, 'r') as data_file:
        records_to_insert = []
        for line in data_file:
            date, max_temp_tenths, min_temp_tenths, precipitation_tenths = line.strip().split()
            formatted_date = f"{date[:4]}-{date[4:6]}-{date[6:]}"  # Formatting in usual date format YYYY-MM-DD
            max_temp = None if max_temp_tenths == '-9999' else float(max_temp_tenths) / 10  # Converting temperature values into degree Celsius
            min_temp = None if min_temp_tenths == '-9999' else float(min_temp_tenths) / 10  # Converting temperature values into degree Celsius
            precipitation = None if precipitation_tenths == '-9999' else float(precipitation_tenths) / 10  # Converting precipitation values into centimeter scale
            records_to_insert.append((formatted_date, max_temp, min_temp, precipitation, station_id))

    if records_to_insert:
        cursor = connection.cursor()
        batch_insert_query = """
        INSERT INTO weather_data (date, max_temp, min_temp, precipitation, station_id)
        VALUES (%s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE 
            max_temp = VALUES(max_temp),
            min_temp = VALUES(min_temp),
            precipitation = VALUES(precipitation);
        """
        try:
            cursor.executemany(batch_insert_query, records_to_insert)
            connection.commit()
            total_records += len(records_to_insert)  # Assuming all operations are successful
        except mysql.connector.Error as error:
            logging.error(f"Error inserting data from file {file_path}: {error}")
        finally:
            cursor.close()

    return total_records


In [6]:
def data_ingestion(directory, config):
    """
    Function to ingest data from all files within the local directory.

    Args:
        directory (str): The path to the directory containing data files.
        config: Configuration object for database connection.

    Returns:
        None
    """
    conn = db_connection(config)  # Assume db_connection is a function that connects to the database
    if not conn:
        logging.error("Failed to connect to the database.")
        return

    total_inserted = 0
    start_time = datetime.now()
    logging.info(f"Start time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
    logging.info("Starting data ingestion process...")

    files = [f for f in os.listdir(directory) if f.endswith('.txt')]
    total_files = len(files)

    with tqdm(total=total_files, desc="Ingesting Data", unit="file") as pbar:
        for filename in files:
            full_path = os.path.join(directory, filename)
            records_inserted = files_processing(full_path, conn)  # Assume files_processing is defined elsewhere
            total_inserted += records_inserted
            pbar.update(1)

    conn.close()
    end_time = datetime.now()
    duration = end_time - start_time
    logging.info(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
    logging.info(f"Duration: {duration}")
    logging.info(f"Total records processed: {total_inserted}")
    logging.info("Data ingestion completed.")


In [7]:
if __name__ == "__main__":
    data_directory = 'C:/Users/Sravya/Desktop/code-challenge-template/wx_data'  # local data directory path
    data_ingestion(data_directory, db_config)

Ingesting Data: 100%|███████████████████████████████████████████████████████████████████████████| 167/167 [01:42<00:00,  1.62file/s]
