# Validating our data


I will be diving into the agricultural dataset for majindogo dataset  to continue to validate  data for usage. Before  that, I want  to build a data pipeline that will ingest and clean our data with the press of a button, cleaning up our code significantly. Once that’s ready, I will  complete my data validation.

# Introduction

From previous automated farm analysis, I will pick up from there.

My  previous disappointment - was the mismatch between our dataset and the weather station data. That was a curveball, and half of our measurements were out of range, raising eyebrows and doubts alike. 


quick summary of previous analysis -  half of the means were not within that tolerance

**Hypothesis testing** taking  into account both the means and the variances of the distributions being compared. The variance here is crucial because it gives us insight into the spread of the data points around the means for our two datasets. Two samples could have the same mean but very different variances, leading to different interpretations of their similarities or differences.

My main goal is the same: Is the data in our `MD_agric_df` dataset representative of reality? To answer this, I use weather-related data from nearby stations to validate the results. If the weather data matches the data we have, now one  can be more confident that our dataset represents reality. 

So my plan is  
1. Create a null hypothesis.
1. Import the `MD_agric_df` dataset and clean it up.
1. Import the weather data.
1. Map the weather data to the field data.
1. Calculate the means of the weather station dataset and the means of the main dataset.
2. Calculate all the parameters we need to do a t-test. 
3. Interpret our results.



# Data dictionary
### from Explore AI Dataset

**1. Geographic features**

- **Field_ID:** A unique identifier for each field (BigInt).
 
- **Elevation:** The elevation of the field above sea level in metres (Float).

- **Latitude:** Geographical latitude of the field in degrees (Float).

- **Longitude:** Geographical longitude of the field in degrees (Float).

- **Location:** Province the field is in (Text).

- **Slope:** The slope of the land in the field (Float).

**2. Weather features**

- **Field_ID:** Corresponding field identifier (BigInt).

- **Rainfall:** Amount of rainfall in the area in mm (Float).

- **Min_temperature_C:** Average minimum temperature recorded in Celsius (Float).

- **Max_temperature_C:** Average maximum temperature recorded in Celsius (Float).

- **Ave_temps:** Average temperature in Celcius (Float).

**3. Soil and crop features**

- **Field_ID:** Corresponding field identifier (BigInt).

- **Soil_fertility:** A measure of soil fertility where 0 is infertile soil, and 1 is very fertile soil (Float).

- **Soil_type:** Type of soil present in the field (Text).

- **pH:** pH level of the soil, which is a measure of how acidic/basic the soil is (Float).

**4. Farm management features**

- **Field_ID:** Corresponding field identifier (BigInt).

- **Pollution_level:** Level of pollution in the area where 0 is unpolluted and 1 is very polluted (Float).

- **Plot_size:** Size of the plot in the field (Ha) (Float).

- **Chosen_crop:** Type of crop chosen for cultivation (Text).

- **Annual_yield:** Annual yield from the field (Float). This is the total output of the field. The field size and type of crop will affect the Annual Yield

- **Standard_yield:** Standardised yield expected from the field, normalised per crop (Float). This is independent of field size, or crop type. Multiplying this number by the field size, and average crop yield will give the Annual_Yield.

<br>

**Weather_station_data (CSV)**

- **Weather_station_ID:** The weather station the data originated from. (Int)

- **Message:** The weather data was captured by sensors at the stations, in the format of text messages.(Str)

**Weather_data_field_mapping (CSV)**

- **Field_ID:** The id of the field that is connected to a weather station. This is the key we can use to join the weather station ID to the original data. (Int)

- **Weather_station_ID:** The weather station that is connected to a field. If a field has `weather_station_ID = 0` then that field is closest to weather station 0. (Int)

<br>

# import pandas
### To avoid errors I will   use python 3.12  and install pyarrow if warning exist

In [1]:
import re
import numpy as np
import pandas as pd
import os

We can safely ignore these warnings, but soon our script will fail to import Pandas, so let's fix it today, and we won't have to worry about it for a long time. The warning tells us that Pyarrow will soon be a requirement to import Pandas, so we can just install it with pip. 

In [2]:
%pip install Pyarrow





# Cleaning up our data pipeline

data set pulled in the data last time, there was an assumption that our script worked. assuming  that all the fixes  made to the data made it in, and assumed that the database didn't change. But what if someone added more records, fixed the data on the database  added a new column of data? 


Last time in automating farm analysis,I use below code to  imported data  (⚠️ Don't run it)

In [None]:
import pandas as pd # importing the Pandas package with an alias, pd
from sqlalchemy import create_engine, text # Importing the SQL interface. If this fails, run !pip install sqlalchemy in another cell.
import matplotlib.pyplot as plt
import seaborn as sns


# Create an engine for the database
engine = create_engine('sqlite:///Maji_Ndogo_farm_survey_small.db') #Make sure to have the .db file in the same directory as this notebook, and the file name matches.

In [None]:
sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

# Create a connection object
with engine.connect() as connection:
    
    # Use Pandas to execute the query and store the result in a DataFrame
    MD_agric_df = pd.read_sql_query(text(sql_query), connection)

In [None]:
MD_agric_df.rename(columns={'Annual_yield': 'Crop_type_Temp', 'Crop_type': 'Annual_yield'}, inplace=True)
MD_agric_df.rename(columns={'Crop_type_Temp': 'Crop_type'}, inplace=True)
MD_agric_df['Elevation'] = MD_agric_df['Elevation'].abs()

# Correcting 'Crop_type' column
def correct_crop_type(crop):
    crop = crop.strip()  # Remove trailing spaces
    corrections = {
        'cassaval': 'cassava',
        'wheatn': 'wheat',
        'teaa': 'tea'
    }
    return corrections.get(crop, crop)  # Get the corrected crop type, or return the original if not in corrections

# Apply the correction function to the Crop_type column
MD_agric_df['Crop_type'] = MD_agric_df['Crop_type'].apply(correct_crop_type)

In [None]:
weather_station_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv")
weather_station_mapping_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv")

In [None]:
import re # Importing the regex pattern
import numpy as np


patterns = {
    'Rainfall': r'(\d+(\.\d+)?)\s?mm',
     'Temperature': r'(\d+(\.\d+)?)\s?C',
    'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'
    }

def extract_measurement(message):
    """
    Extracts a numeric measurement value from a given message string.

    The function applies regular expressions to identify and extract
    numeric values related to different types of measurements such as
    Rainfall, Average Temperatures, and Pollution Levels from a text message.
    It returns the key of the matching record, and first matching value as a floating-point number.
    
    Parameters:
    message (str): A string message containing the measurement information.

    Returns:
    float: The extracted numeric value of the measurement if a match is found;
           otherwise, None.

    The function uses the following patterns for extraction:
    - Rainfall: Matches numbers (including decimal) followed by 'mm', optionally spaced.
    - Ave_temps: Matches numbers (including decimal) followed by 'C', optionally spaced.
    - Pollution_level: Matches numbers (including decimal) following 'Pollution at' or '='.
    
    Example usage:
    extract_measurement("【2022-01-04 21:47:48】温度感应: 现在温度是 12.82C.")
    # Returns: 'Temperature', 12.82
    """
    
    for key, pattern in patterns.items(): # Loop through all of the patterns and check if it matches the pattern value.
        match = re.search(pattern, message)
        if match:
            # Extract the first group that matches, which should be the measurement value if all previous matches are empty.
            # print(match.groups()) # Uncomment this line to help you debug your regex patterns.
            return key, float(next((x for x in match.groups() if x is not None)))
    
    return None, None

# The function creates a tuple with the measurement type and value into a Pandas Series
result = weather_station_df['Message'].apply(extract_measurement)

# Create separate columns for 'Measurement' and 'extracted_value' by unpacking the tuple with Lambda functions.
weather_station_df['Measurement'] = result.apply(lambda x: x[0])
weather_station_df['Value'] = result.apply(lambda x: x[1])

In [None]:
# The function creates a tuple with the measurement type and value into a Pandas Series
result = weather_station_df['Message'].apply(extract_measurement)

# Create separate columns for 'Measurement' and 'extracted_value' by unpacking the tuple with Lambda functions.
weather_station_df['Measurement'] = result.apply(lambda x: x[0])
weather_station_df['Value'] = result.apply(lambda x: x[1])

weather_station_means = weather_station_df.groupby(by = ['Weather_station_ID','Measurement'])['Value'].mean(numeric_only = True)
weather_station_means = weather_station_means.unstack()
weather_station_means

In [None]:
# Use this line of code to see which messages are not assigned yet.
weather_station_df[(weather_station_df['Measurement'] == None)|(weather_station_df['Value'].isna())]

In [None]:

MD_agric_df = MD_agric_df.merge(weather_station_mapping_df,on = 'Field_ID', how='left')
MD_agric_df.drop(columns="Unnamed: 0")
MD_agric_df_weather_means = MD_agric_df.groupby("Weather_station").mean(numeric_only = True)[['Pollution_level','Rainfall', 'Ave_temps']]

MD_agric_df_weather_means = MD_agric_df_weather_means.rename(columns = {'Ave_temps':"Temperature"})
MD_agric_df_weather_means

The above code one have to copy that code across from another notebook, and one may miss some blocks or lines of code or not copy over the code in the right order, and ne may not have documented the code well either. 

As a final step, Will  automate a few simple data validation checks in our code.


creating  a module to interact with the database, a module to transform and clean the field-related data and another module to process the weather data.

<br>

So here's the plan: 

1. Gather all of the code from our last "pipeline".

2. Re-organise the code into our new three modules: 

    a. `data_ingesation.py` - All SQL-related functions, and web-based data retrieval.

    b. `field_data_processor.py` - All transformations, cleanup, and merging functionality.

    c. `weather_data_processor.py` - All transformations and cleanup of the weather station data.

3. Copy our code into the modules and test their functionality.

4. Create automated data validation tests to ensure our data is as we expect it to be.


# Modules

To reduce our data pipeline code to a couple of lines like this: 

In [3]:
field_processor.process()
field_df = field_processor.df

weather_processor.process()
field_df = field_processor.weather_df

NameError: name 'field_data_processor' is not defined

The benefit of validating your data If one wants to debug a problem in the field data, we know where to go, and if one want to import other IoT weather sensors one can just modify the weather data module.

The first challenge; automating the data ingestion. There are two places we're fetching data:
1. SQLite database - creating an SQLite engine, connecting to the database, running a query, and returning a pandas DataFrame.
2. Web CSV file - Read the CSV data from the web, and import it as a DataFrame.



## Data ingestion

SQL:

In [4]:
import pandas as pd # importing the Pandas package with an alias, pd
from sqlalchemy import create_engine, text # Importing the SQL interface. If this fails, run !pip install sqlalchemy in another cell.
import matplotlib.pyplot as plt
import seaborn as sns


# Create an engine for the database
engine = create_engine('sqlite:///Maji_Ndogo_farm_survey_small.db') #Make sure to have the .db file in the same directory as this notebook, and the file name matches.


sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

# Create a connection object
with engine.connect() as connection:
    
    # Use Pandas to execute the query and store the result in a DataFrame
    MD_agric_df = pd.read_sql_query(text(sql_query), connection)

CSV files: 

In [5]:
weather_station_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv")
weather_station_mapping_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv")

 **convert the data ingestion code into functions** that one can call from the module.

So this code: 

In [7]:
# Create an engine for the database
engine = create_engine('sqlite:///Maji_Ndogo_farm_survey_small.db') #Make sure to have the .db file in the same directory as this notebook, and the file name matches.


sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

# Create a connection object
with engine.connect() as connection:
    
    # Use Pandas to execute the query and store the result in a DataFrame
    MD_agric_df = pd.read_sql_query(text(sql_query), connection)

The function for the above  code 

In [8]:
def create_db_engine(db_path):
    engine = create_engine(db_path)
    return engine

def query_data(engine, sql_query):
    with engine.connect() as connection:
        df = pd.read_sql_query(text(sql_query), connection)
        return df

So if we call:

In [9]:
create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db')

Engine(sqlite:///Maji_Ndogo_farm_survey_small.db)

 SQL engine object which one can use with the query to connect to the database, and run a query.

In [10]:
SQL_engine = create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db')

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""


df = query_data(SQL_engine, sql_query)
df

Unnamed: 0,Field_ID,Elevation,Latitude,Longitude,Location,Slope,Rainfall,Min_temperature_C,Max_temperature_C,Ave_temps,Soil_fertility,Soil_type,pH,Pollution_level,Plot_size,Crop_type,Annual_yield,Standard_yield
0,40734,786.05580,-7.389911,-7.556202,Rural_Akatsi,14.795113,1125.2,-3.1,33.1,15.00,0.62,Sandy,6.169393,8.526684e-02,1.3,0.751354,cassava,0.577964
1,30629,674.33410,-7.736849,-1.051539,Rural_Sokoto,11.374611,1450.7,-3.9,30.6,13.35,0.64,Volcanic,5.676648,3.996838e-01,2.2,1.069865,cassava,0.486302
2,39924,826.53390,-9.926616,0.115156,Rural_Sokoto,11.339692,2208.9,-1.8,28.4,13.30,0.69,Volcanic,5.331993,3.580286e-01,3.4,2.208801,tea,0.649647
3,5754,574.94617,-2.420131,-6.592215,Rural_Kilimani,7.109855,328.8,-5.8,32.2,13.20,0.54,Loamy,5.328150,2.866871e-01,2.4,1.277635,cassava,0.532348
4,14146,886.35300,-3.055434,-7.952609,Rural_Kilimani,55.007656,785.2,-2.5,31.0,14.25,0.72,Sandy,5.721234,4.319027e-02,1.5,0.832614,wheat,0.555076
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5649,11472,681.36145,-7.358371,-6.254369,Rural_Akatsi,16.213196,885.7,-4.3,33.4,14.55,0.61,Sandy,5.741063,3.286828e-01,1.1,0.609930,potato,0.554482
5650,19660,667.02120,-3.154559,-4.475046,Rural_Kilimani,2.397553,501.1,-4.8,32.1,13.65,0.54,Sandy,5.445833,1.602583e-01,8.7,3.812289,maize,0.438194
5651,41296,670.77900,-14.472861,-6.110221,Rural_Hawassa,7.636470,1586.6,-3.8,33.4,14.80,0.64,Volcanic,5.385873,8.221326e-09,2.1,1.681629,tea,0.800776
5652,33090,429.48840,-14.653089,-6.984116,Rural_Hawassa,13.944720,1272.2,-6.2,34.6,14.20,0.63,Silt,5.562508,6.917245e-10,1.3,0.659874,cassava,0.507595


 `create_db_engine()` function inside the `query_data()` function:

In [12]:
sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""


df = query_data(create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db'), sql_query)
df

Unnamed: 0,Field_ID,Elevation,Latitude,Longitude,Location,Slope,Rainfall,Min_temperature_C,Max_temperature_C,Ave_temps,Soil_fertility,Soil_type,pH,Pollution_level,Plot_size,Crop_type,Annual_yield,Standard_yield
0,40734,786.05580,-7.389911,-7.556202,Rural_Akatsi,14.795113,1125.2,-3.1,33.1,15.00,0.62,Sandy,6.169393,8.526684e-02,1.3,0.751354,cassava,0.577964
1,30629,674.33410,-7.736849,-1.051539,Rural_Sokoto,11.374611,1450.7,-3.9,30.6,13.35,0.64,Volcanic,5.676648,3.996838e-01,2.2,1.069865,cassava,0.486302
2,39924,826.53390,-9.926616,0.115156,Rural_Sokoto,11.339692,2208.9,-1.8,28.4,13.30,0.69,Volcanic,5.331993,3.580286e-01,3.4,2.208801,tea,0.649647
3,5754,574.94617,-2.420131,-6.592215,Rural_Kilimani,7.109855,328.8,-5.8,32.2,13.20,0.54,Loamy,5.328150,2.866871e-01,2.4,1.277635,cassava,0.532348
4,14146,886.35300,-3.055434,-7.952609,Rural_Kilimani,55.007656,785.2,-2.5,31.0,14.25,0.72,Sandy,5.721234,4.319027e-02,1.5,0.832614,wheat,0.555076
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5649,11472,681.36145,-7.358371,-6.254369,Rural_Akatsi,16.213196,885.7,-4.3,33.4,14.55,0.61,Sandy,5.741063,3.286828e-01,1.1,0.609930,potato,0.554482
5650,19660,667.02120,-3.154559,-4.475046,Rural_Kilimani,2.397553,501.1,-4.8,32.1,13.65,0.54,Sandy,5.445833,1.602583e-01,8.7,3.812289,maize,0.438194
5651,41296,670.77900,-14.472861,-6.110221,Rural_Hawassa,7.636470,1586.6,-3.8,33.4,14.80,0.64,Volcanic,5.385873,8.221326e-09,2.1,1.681629,tea,0.800776
5652,33090,429.48840,-14.653089,-6.984116,Rural_Hawassa,13.944720,1272.2,-6.2,34.6,14.20,0.63,Silt,5.562508,6.917245e-10,1.3,0.659874,cassava,0.507595


### below cell return an empty df

In [12]:
SQL_engine = create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db')

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
WHERE Rainfall < 0 
"""
# The last line won't ever be true, so no results will be returned. 

df = query_data(SQL_engine, sql_query)
df

Unnamed: 0,Field_ID,Elevation,Latitude,Longitude,Location,Slope,Rainfall,Min_temperature_C,Max_temperature_C,Ave_temps,Soil_fertility,Soil_type,pH,Pollution_level,Plot_size,Crop_type,Annual_yield,Standard_yield


The empty DataFrame, its is because SQL returned an empty query result. if one try to filter results, we get an answer that would not make sense.

In [15]:
df['Rainfall'] > 100

Series([], Name: Rainfall, dtype: bool)

to avoid this one need to add error handling into our code so that we stop the process if something is wrong, and tell us what the problem is before we continue. 

Secondly, to help us understand how our code is executing we're going to add some logs. While print statements can help us to debug our code,one  have to remove them once our code goes into use, one by one. `logging` is a better way to debug our code than print statements because one can add `logging.INFO()` logs to know what our code is doing, and `logging.DEBUG()` statements that have more detail in case we want to debug a specific loop in a bit more in detail. There are also various other tools to use, and we can also silence all logging with a single line of code. If we used print statements, we will have to comment them out one by one. 

 apply these two ideas, we get the code below:

In [16]:
import logging
import pandas as pd

# Name our logger so we know that logs from this module come from the data_ingestion module
logger = logging.getLogger('data_ingestion')

# Set a basic logging message up that prints out a timestamp, the name of our logger, and the message
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')


def create_db_engine(db_path):
    try:
        engine = create_engine(db_path)
        # Test connection
        with engine.connect() as conn:
            pass
        # test if the database engine was created successfully
        logger.info("Database engine created successfully.")
        return engine # Return the engine object if it all works well
    except ImportError: #If we get an ImportError, inform the user SQLAlchemy is not installed
        logger.error("SQLAlchemy is required to use this function. Please install it first.")
        raise e
    except Exception as e:# If we fail to create an engine inform the user
        logger.error(f"Failed to create database engine. Error: {e}")
        raise e
    
def query_data(engine, sql_query):
    try:
        with engine.connect() as connection:
            df = pd.read_sql_query(text(sql_query), connection)
        if df.empty:
            # Log a message or handle the empty DataFrame scenario as needed
            msg = "The query returned an empty DataFrame."
            logger.error(msg)
            raise ValueError(msg)
        logger.info("Query executed successfully.")
        return df
    except ValueError as e: 
        logger.error(f"SQL query failed. Error: {e}")
        raise e
    except Exception as e:
        logger.error(f"An error occurred while querying the database. Error: {e}")
        raise e

This code has an error as processing was prevented further

In [16]:
SQL_engine = create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db')

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
WHERE Rainfall < 0 
"""
# The last line won't ever be true, so no results will be returned. 

df = query_data(SQL_engine, sql_query)
df

2024-02-26 12:15:11,517 - data_ingestion - INFO - Database engine created successfully.
2024-02-26 12:15:11,528 - data_ingestion - ERROR - The query returned an empty DataFrame.
2024-02-26 12:15:11,529 - data_ingestion - ERROR - SQL query failed. Error: The query returned an empty DataFrame.


ValueError: The query returned an empty DataFrame.

 including the CSV data handling. This is the original code: 

In [18]:
weather_station_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv")
weather_station_mapping_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv")

These two files are imported in the same way, one function can do it.

In [19]:
weather_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv"
weather_mapping_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"


def read_from_web_CSV(URL):
    try:
        df = pd.read_csv(URL)
        logger.info("CSV file read successfully from the web.")
        return df
    except pd.errors.EmptyDataError as e:
        logger.error("The URL does not point to a valid CSV file. Please check the URL and try again.")
        raise e
    except Exception as e:
        logger.error(f"Failed to read CSV from the web. Error: {e}")
        raise e
    
    
weather_df = read_from_web_CSV(weather_data_URL)
weather_mapping_data = read_from_web_CSV(weather_mapping_data_URL)

2024-05-04 10:44:43,785 - data_ingestion - INFO - CSV file read successfully from the web.
2024-05-04 10:44:45,191 - data_ingestion - INFO - CSV file read successfully from the web.


The code can connect to a database for the field data, use a query to retrieve data, and create a DataFrame. 

In [20]:

from sqlalchemy import create_engine, text
import logging
import pandas as pd

# Name our logger so we know that logs from this module come from the data_ingestion module
logger = logging.getLogger('data_ingestion')
# Set a basic logging message up that prints out a timestamp, the name of our logger, and the message
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

weather_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv"
weather_mapping_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"

def create_db_engine(db_path):
    try:
        engine = create_engine(db_path)
        # Test connection
        with engine.connect() as conn:
            pass
        # test if the database engine was created successfully
        logger.info("Database engine created successfully.")
        return engine # Return the engine object if it all works well
    except ImportError: #If we get an ImportError, inform the user SQLAlchemy is not installed
        logger.error("SQLAlchemy is required to use this function. Please install it first.")
        raise e
    except Exception as e:# If we fail to create an engine inform the user
        logger.error(f"Failed to create database engine. Error: {e}")
        raise e
    
def query_data(engine, sql_query):
    try:
        with engine.connect() as connection:
            df = pd.read_sql_query(text(sql_query), connection)
        if df.empty:
            # Log a message or handle the empty DataFrame scenario as needed
            msg = "The query returned an empty DataFrame."
            logger.error(msg)
            raise ValueError(msg)
        logger.info("Query executed successfully.")
        return df
    except ValueError as e: 
        logger.error(f"SQL query failed. Error: {e}")
        raise e
    except Exception as e:
        logger.error(f"An error occurred while querying the database. Error: {e}")
        raise e
    
def read_from_web_CSV(URL):
    try:
        df = pd.read_csv(URL)
        logger.info("CSV file read successfully from the web.")
        return df
    except pd.errors.EmptyDataError as e:
        logger.error("The URL does not point to a valid CSV file. Please check the URL and try again.")
        raise e
    except Exception as e:
        logger.error(f"Failed to read CSV from the web. Error: {e}")
        raise e


In [21]:
# Testing module functions  
field_df = query_data(create_db_engine(db_path), sql_query)   
weather_df = read_from_web_CSV(weather_data_URL)
weather_mapping_df = read_from_web_CSV(weather_mapping_data_URL)

2024-05-04 10:47:55,512 - data_ingestion - INFO - Database engine created successfully.
2024-05-04 10:47:55,785 - data_ingestion - INFO - Query executed successfully.
2024-05-04 10:47:57,480 - data_ingestion - INFO - CSV file read successfully from the web.
2024-05-04 10:47:58,855 - data_ingestion - INFO - CSV file read successfully from the web.



Note that there are imports at the top of the cell.it needs to import packages like SQL Alchemy and Pandas. 


In [22]:
field_test = field_df.shape
weather_test = weather_df.shape
weather_mapping_test = weather_mapping_df.shape
print(f"field_df: {field_test}, weather_df: {weather_test}, weather_mapping_df: {weather_mapping_test}")

field_df: (5654, 18), weather_df: (1843, 2), weather_mapping_df: (5654, 3)


In [23]:


from sqlalchemy import create_engine, text
import logging
import pandas as pd

"""
Module: Data Ingestion

This module handles the ingestion of data into the Maji Ndogo farm survey database

Import Statements: The code imports necessary modules such as create_engine and text from SQLAlchemy, logging, and pandas for data manipulation.

Logger Configuration: It configures a logger named 'data_ingestion' using the Python logging module. 
The logger will output log messages of level INFO and above to the console with a specific format including a timestamp, logger name, 
and log level.

Database Path: It defines the path to a SQLite database file named 'Maji_Ndogo_farm_survey_small.db'.

"""
# Name our logger so we know that logs from this module come from the data_ingestion module
logger = logging.getLogger('data_ingestion')
# Set a basic logging message up that prints out a timestamp, the name of our logger, and the message
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

weather_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv"
weather_mapping_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"

In [21]:
### START FUNCTION
from sqlalchemy import create_engine, text
import logging
import pandas as pd
def create_db_engine(db_path):

    """Create a database engine.

    Args:
        db_path (str): The path to the database.

    Returns:
        engine (sqlalchemy.engine.Engine): The SQLAlchemy engine object.

    Raises:
        ImportError: If SQLAlchemy is not installed.
        Exception: If there is an error creating the database engine.
    """

    try:
        engine = create_engine(db_path)
        # Test connection
        with engine.connect() as conn:
            pass
        # test if the database engine was created successfully
        logger.info("Database engine created successfully.")
        return engine  # Return the engine object if it all works well
    except ImportError:  # If we get an ImportError, inform the user SQLAlchemy is not installed
        logger.error("SQLAlchemy is required to use this function. Please install it first.")
        raise e
    except Exception as e:  # If we fail to create an engine inform the user
        logger.error(f"Failed to create database engine. Error: {e}")
        raise e


def query_data(engine, sql_query):
    """Query data: executes a SQL query on the database engine and returns the result as a pandas DataFrame.

    Args:
        engine (sqlalchemy.engine.Engine): The SQLAlchemy engine object.
        sql_query (str): The SQL query to be executed.

    Returns:
        df (pandas.DataFrame): The result of the SQL query as a pandas DataFrame.

    Raises:
        ValueError: if the query returns an empty df
        Exception: if there is an error while querying the database.
    """

    try:
        with engine.connect() as connection:
            df = pd.read_sql_query(text(sql_query), connection)
        if df.empty:
            # Log a message or handle the empty DataFrame scenario as needed
            msg = "The query returned an empty DataFrame."
            logger.error(msg)
            raise ValueError(msg)
        logger.info("Query executed successfully.")
        return df
    except ValueError as e:
        logger.error(f"SQL query failed. Error: {e}")
        raise e
    except Exception as e:
        logger.error(f"An error occurred while querying the database. Error: {e}")
        raise e


def read_from_web_CSV(URL):
    """
    Read from web csv: reads csv file from the specified URL and returns it as pandas dataframe.

    Args:
        URL (str): URL from where csv file is located.

    Returns:
        dataframe (pandas.DataFrame): The DataFrame with the csv data.

    Raises:
        pd.errors.EmptyDataError: if URL is not found or it is empty.
        Exception: if the URL is not readable from the web or there is an error while reading the file from the web.
    """

    try:
        df = pd.read_csv(URL)
        logger.info("CSV file read successfully from the web.")
        return df
    except pd.errors.EmptyDataError as e:
        logger.error("The URL does not point to a valid CSV file. Please check the URL and try again.")
        raise e
    except Exception as e:
        logger.error(f"Failed to read CSV from the web. Error: {e}")
        raise e

### END FUNCTION


Testing code to make sure it works 

In [24]:
# Testing module functions  
field_df = query_data(create_db_engine(db_path), sql_query)   
weather_df = read_from_web_CSV(weather_data_URL)
weather_mapping_df = read_from_web_CSV(weather_mapping_data_URL)

field_test = field_df.shape
weather_test = weather_df.shape
weather_mapping_test = weather_mapping_df.shape
print(f"field_df: {field_test}, weather_df: {weather_test}, weather_mapping_df: {weather_mapping_test}")

2024-05-04 10:51:22,167 - data_ingestion - INFO - Database engine created successfully.
2024-05-04 10:51:22,704 - data_ingestion - INFO - Query executed successfully.
2024-05-04 10:51:24,784 - data_ingestion - INFO - CSV file read successfully from the web.
2024-05-04 10:51:26,321 - data_ingestion - INFO - CSV file read successfully from the web.


field_df: (5654, 18), weather_df: (1843, 2), weather_mapping_df: (5654, 3)


creating a new file, `data_ingestion.py` and import the functions into the notebook.

In [25]:
# Importing our new module
from data_ingestion import create_db_engine, query_data, read_from_web_CSV

#Checking if the function names are now associated with the module
print(create_db_engine.__module__)
print(query_data.__module__)
print(read_from_web_CSV.__module__)


data_ingestion
data_ingestion
data_ingestion



names `create_db_engine`, `query_data`, `read_from_web_CSV` are linked to the `data_ingestion` module, so the  module is imported correctly.

In [26]:
field_df = query_data(create_db_engine(db_path), sql_query)   
weather_df = read_from_web_CSV(weather_data_URL)
weather_mapping_df = read_from_web_CSV(weather_mapping_data_URL)

field_test = field_df.shape
weather_test = weather_df.shape
weather_mapping_test = weather_mapping_df.shape
print(f"field_df: {field_test}, weather_df: {weather_test}, weather_mapping_df: {weather_mapping_test}")

2024-05-04 10:52:49,118 - data_ingestion - INFO - Database engine created successfully.
2024-05-04 10:52:49,349 - data_ingestion - INFO - Query executed successfully.
2024-05-04 10:52:51,034 - data_ingestion - INFO - CSV file read successfully from the web.
2024-05-04 10:52:52,578 - data_ingestion - INFO - CSV file read successfully from the web.


field_df: (5654, 18), weather_df: (1843, 2), weather_mapping_df: (5654, 3)


## Field data processor

processing the field data.

In [27]:
MD_agric_df = field_df.copy()

MD_agric_df.rename(columns={'Annual_yield': 'Crop_type_Temp', 'Crop_type': 'Annual_yield'}, inplace=True)
MD_agric_df.rename(columns={'Crop_type_Temp': 'Crop_type'}, inplace=True)
MD_agric_df['Elevation'] = MD_agric_df['Elevation'].abs()

# Correcting 'Crop_type' column
def correct_crop_type(crop):
    corrections = {
        'cassaval': 'cassava',
        'wheatn': 'wheat',
        'teaa': 'tea'
    }
    return corrections.get(crop, crop)  # Get the corrected crop type, or return the original if not in corrections

# Apply the correction function to the Crop_type column
MD_agric_df['Crop_type'] = MD_agric_df['Crop_type'].apply(correct_crop_type)

building a Class that encapsulates the whole data processing process for the field-related data called `FieldDataProcessor`. In the class, it will create a DataFrame attribute and methods that alter that attribute.  encapsulate all of the logic in this `FieldDataProcessor` class,  abstract all of the details and only need to call something like `FieldDataProcessor.process_data()`. 

create the class framework:

In [32]:
import pandas as pd
from data_ingestion import create_db_engine, query_data, read_from_web_CSV
import logging

class FieldDataProcessor:

    def __init__(self, logging_level="INFO"): # When we instantiate this class, we can optionally specify what logs we want to see

        # Initialising class with attributes we need. Refer to the code above to understand how each attribute relates to the code
        self.db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'
        self.sql_query = """
            SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
        self.columns_to_rename = {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}
        self.values_to_rename = {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}
        self.weather_map_data = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"

        self.initialize_logging(logging_level)
        
        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
        
    # This method enables logging in the class. 
    def initialize_logging(self, logging_level):
        """
        Sets up logging for this instance of FieldDataProcessor.
        """
        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.


    # DataFrame methods 
    def ingest_sql_data(self):
        # First we want to get the data from the SQL database
        pass
    
    def rename_columns(self):
        # Annual_yield and Crop_type must be swapped
        pass

    def apply_corrections(self):
        # Correct the crop strings, Eg: 'cassaval' -> 'cassava'
        pass

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods and applies the changes, step by step. This is the method we will call, and it will call the other methods in order
        
        weather_map_df = self.weather_station_mapping() 
        self.df = self.ingest_sql_data()
        self.df = self.rename_columns()
        self.df = self.apply_corrections()
        self.df = self.df.merge(weather_map_df, on='Field_ID', how='left')
        self.df = self.df.drop(columns="Unnamed: 0")

instantiating the class, and call one method, `.process()` to ingest and clean the data.

In [33]:
# This code won't run for now, since we have not defined all of the methods.
field_processor = FieldDataProcessor()
field_processor.process()

AttributeError: 'NoneType' object has no attribute 'merge'

In [30]:
field_df = field_processor.df

### `def ingest_sql_data()`

create a copy of the class, and start filling out the code for the methods.  dropping the `.process()` method for now and  add it back once it all works. 

 Unscramble the code in the `.ingest_sql_data()` method. The method should return the initial DataFrame.

In [35]:
import pandas as pd
from data_ingestion import create_db_engine, query_data, read_from_web_CSV
import logging

class FieldDataProcessor:

    def __init__(self, logging_level="INFO"): # When we instantiate this class, we can optionally specify what logs we want to see

        # Initialising class with attributes we need. Refer to the code above to understand how each attribute relates to the code
        self.db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'
        self.sql_query = """SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
        self.columns_to_rename = {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}
        self.values_to_rename = {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}
        self.weather_map_data = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"
        
        self.initialize_logging(logging_level)

        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
        
    # This method enables logging in the class.
    def initialize_logging(self, logging_level):
        """
        Sets up logging for this instance of FieldDataProcessor.
        """
        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.


    # let's focus only on this part from now on
    def ingest_sql_data(self):
        self.engine = create_db_engine(self.db_path)
        self.df = query_data(self.engine, self.sql_query)
        self.logger.info("Sucessfully loaded data.")
        return self.df
        self.logger.info("Sucessfully loaded data.")
       
        

    def rename_columns(self):
        # Annual_yield and Crop_type must be swapped
        pass

    def apply_corrections(self):
        # Correct the crop strings, Eg: 'cassaval' -> 'cassava'
        pass

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods, and applies the changes, step by step. THis is the method we will call, and it will call the other methods in order.
        pass

We can use the code below to check if the code works as expected:

In [36]:
field_processor = FieldDataProcessor()
field_processor.ingest_sql_data()
field_df = field_processor.df
print(field_df.shape)

2024-05-04 11:02:16,233 - data_ingestion - INFO - Database engine created successfully.
2024-05-04 11:02:17,508 - data_ingestion - INFO - Query executed successfully.
2024-05-04 11:02:17,510 - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.


(5654, 18)


### `def rename_columns()`

adding `rename_columns()`

 Copying the class into the top of this cell, andunscramblinge the code sections in the `.rename_columns()` method.

In [37]:
# Copy in your class including the ingest_sql_data method here
import pandas as pd
from data_ingestion import create_db_engine, query_data, read_from_web_CSV
import logging

class FieldDataProcessor:

    def __init__(self, logging_level="INFO"): # When we instantiate this class, we can optionally specify what logs we want to see

        # Initialising class with attributes we need. Refer to the code above to understand how each attribute relates to the code
        self.db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'
        self.sql_query = """SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
        self.columns_to_rename = {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}
        self.values_to_rename = {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}
        self.weather_map_data = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"
        
        self.initialize_logging(logging_level)

        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
        
    # This method enables logging in the class.
    def initialize_logging(self, logging_level):
        """
        Sets up logging for this instance of FieldDataProcessor.
        """
        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.


    # let's focus only on this part from now on
    def ingest_sql_data(self):
        self.engine = create_db_engine(self.db_path)
        self.df = query_data(self.engine, self.sql_query)
        self.logger.info("Sucessfully loaded data.")
        return self.df
        self.logger.info("Sucessfully loaded data.")
       
        

    def rename_columns(self):
        # Annual_yield and Crop_type must be swapped
        pass

    def apply_corrections(self):
        # Correct the crop strings, Eg: 'cassaval' -> 'cassava'
        pass

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods, and applies the changes, step by step. THis is the method we will call, and it will call the other methods in order.
        pass
    def rename_columns(self):
        # Extract the columns to rename from the configuration
        column1, column2 = list(self.columns_to_rename.keys())[0], list(self.columns_to_rename.values())[0]   
          # Temporarily rename one of the columns to avoid a naming conflict
        temp_name = "__temp_name_for_swap__"
        while temp_name in self.df.columns:
            temp_name += "_"
        self.logger.info(f"Swapped columns: {column1} with {column2}")

        # Perform the swap
        self.df = self.df.rename(columns={column1: temp_name, column2: column1})
        self.df = self.df.rename(columns={temp_name: column2})
           

      
            
    def apply_corrections(self):
        # Correct the crop strings, Eg: 'cassaval' -> 'cassava'
        pass

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods, and applies the changes, step by step. THis is the method we will call, and it will call the other methods in order.
        pass

`rename_columns()` does not return anything. It doesn't need to, because it is modifying the class attribute (data) `self.df`. This is the benefit of using a class. 

This code  instantiate the class, connect to the database, and swap the column names.



In [38]:
field_processor = FieldDataProcessor()
field_processor.ingest_sql_data()
field_processor.rename_columns()
field_df = field_processor.df
field_df['Annual_yield'].head(3)

2024-05-04 11:04:38,751 - data_ingestion - INFO - Database engine created successfully.
2024-05-04 11:04:39,230 - data_ingestion - INFO - Query executed successfully.
2024-05-04 11:04:39,233 - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-05-04 11:04:39,264 - __main__.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type


0    0.751354
1    1.069865
2    2.208801
Name: Annual_yield, dtype: float64

### `def apply_corrections()`

 Copying  class into the top of this cell in the `.apply_corrections()` method.

In [39]:

import pandas as pd
from data_ingestion import create_db_engine, query_data, read_from_web_CSV
import logging

class FieldDataProcessor:

    def __init__(self, logging_level="INFO"): # When we instantiate this class, we can optionally specify what logs we want to see

        # Initialising class with attributes we need. Refer to the code above to understand how each attribute relates to the code
        self.db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'
        self.sql_query = """SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
        self.columns_to_rename = {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}
        self.values_to_rename = {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}
        self.weather_map_data = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"
        
        self.initialize_logging(logging_level)

        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
        
    # This method enables logging in the class.
    def initialize_logging(self, logging_level):
        """
        Sets up logging for this instance of FieldDataProcessor.
        """
        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.


    # let's focus only on this part from now on
    def ingest_sql_data(self):
        self.engine = create_db_engine(self.db_path)
        self.df = query_data(self.engine, self.sql_query)
        self.logger.info("Sucessfully loaded data.")
        return self.df
        self.logger.info("Sucessfully loaded data.")
       
        

    def rename_columns(self):
        # Annual_yield and Crop_type must be swapped
        pass

    def apply_corrections(self):
        # Correct the crop strings, Eg: 'cassaval' -> 'cassava'
        pass

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods, and applies the changes, step by step. THis is the method we will call, and it will call the other methods in order.
        pass
# Copy in your class including the ingest_sql_data and  method here
    
    def apply_corrections(self, column_name='Crop_type', abs_column='Elevation'):
        self.df[abs_column] = self.df[abs_column].abs()
        self.df[column_name] = self.df[column_name].apply(lambda crop: self.values_to_rename.get(crop, crop))

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods, and applies the changes, step by step. THis is the method we will call, and it will call the other methods in order.
        pass

 testing if  new method works before we move onto the next one.



field_processor = FieldDataProcessor()
field_processor.ingest_sql_data()
field_processor.rename_columns()
field_processor.apply_corrections()

field_df = field_processor.df
field_df.query("Crop_type in ['cassaval','wheatn']")

expected empty data frame

### `def weather_station_mapping()`

In [41]:
import pandas as pd
from data_ingestion import create_db_engine, query_data, read_from_web_CSV
import logging

class FieldDataProcessor:

    def __init__(self, logging_level="INFO"): # When we instantiate this class, we can optionally specify what logs we want to see

        # Initialising class with attributes we need. Refer to the code above to understand how each attribute relates to the code
        self.db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'
        self.sql_query = """SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
        self.columns_to_rename = {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}
        self.values_to_rename = {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}
        self.weather_map_data = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"
        
        self.initialize_logging(logging_level)

        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
        
    # This method enables logging in the class.
    def initialize_logging(self, logging_level):
        """
        Sets up logging for this instance of FieldDataProcessor.
        """
        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.


    # let's focus only on this part from now on
    def ingest_sql_data(self):
        self.engine = create_db_engine(self.db_path)
        self.df = query_data(self.engine, self.sql_query)
        self.logger.info("Sucessfully loaded data.")
        return self.df
        self.logger.info("Sucessfully loaded data.")
       
        

    def rename_columns(self):
        # Annual_yield and Crop_type must be swapped
        pass

    def apply_corrections(self):
        # Correct the crop strings, Eg: 'cassaval' -> 'cassava'
        pass

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods, and applies the changes, step by step. THis is the method we will call, and it will call the other methods in order.
        pass
# Copy in your class including the ingest_sql_data and method here


        
    def weather_station_mapping(self):
        weather_map_df = read_from_web_CSV(self.weather_map_data)
        self.df = self.df.merge(weather_map_df, on = 'Field_ID', how = 'left')
        return read_from_web_CSV(self.weather_map_data)

    def process(self):
        # This process calls the correct methods and applies the changes, step by step. This is the method we will call, and it will call the other methods in order.
        pass

Once again, test.



In [42]:
field_processor = FieldDataProcessor()
field_processor.ingest_sql_data()
field_processor.rename_columns()
field_processor.apply_corrections()
field_processor.weather_station_mapping()

field_df = field_processor.df
field_df['Weather_station'].unique()

2024-05-04 11:07:05,510 - data_ingestion - INFO - Database engine created successfully.
2024-05-04 11:07:05,820 - data_ingestion - INFO - Query executed successfully.
2024-05-04 11:07:05,821 - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-05-04 11:07:23,182 - data_ingestion - INFO - CSV file read successfully from the web.
2024-05-04 11:07:30,823 - data_ingestion - INFO - CSV file read successfully from the web.


array([4, 0, 1, 2, 3], dtype=int64)

### `def process()`

 put it all together. Remembering that the `.process()` method calls all of the other methods in order.

completing the `.process()` method.

In [43]:
# Copy in your class including the ingest_sql_data and  method here
# Copy in your class including the ingest_sql_data and  method here


class FieldDataProcessor:
    import pandas as pd
    from data_ingestion import create_db_engine, query_data, read_from_web_CSV
    import logging

    def __init__(self, logging_level="INFO"): # When we instantiate this class, we can optionally specify what logs we want to see

        # Initialising class with attributes we need. Refer to the code above to understand how each attribute relates to the code
        self.db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'
        self.sql_query = """SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
        self.columns_to_rename = {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}
        self.values_to_rename = {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}
        self.weather_map_data = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"
        
        self.initialize_logging(logging_level)

        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
        
    # This method enables logging in the class.
    def initialize_logging(self, logging_level):
        """
        Sets up logging for this instance of FieldDataProcessor.
        """
        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.


    # let's focus only on this part from now on
    def ingest_sql_data(self):
        self.engine = create_db_engine(self.db_path)
        self.df = query_data(self.engine, self.sql_query)
        self.logger.info("Sucessfully loaded data.")
        return self.df
       
       

    def rename_columns(self):
        # Annual_yield and Crop_type must be swapped
        pass

    def apply_corrections(self):
        # Correct the crop strings, Eg: 'cassaval' -> 'cassava'
        pass

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods, and applies the changes, step by step. THis is the method we will call, and it will call the other methods in order.
        pass

    
    def rename_columns(self):
        # Extract the columns to rename from the configuration
        column1, column2 = list(self.columns_to_rename.keys())[0], list(self.columns_to_rename.values())[0]
        # Temporarily rename one of the columns to avoid a naming conflict
        temp_name = "__temp_name_for_swap__"
        while temp_name in self.df.columns:
            temp_name += "_"
        # Perform the swap
        self.df = self.df.rename(columns={column1: temp_name, column2: column1})
        self.df = self.df.rename(columns={temp_name: column2})   
        
        self.logger.info(f"Swapped columns: {column1} with {column2}")

                    
    def apply_corrections(self):
        # Correct the crop strings, Eg: 'cassaval' -> 'cassava'
        pass

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods, and applies the changes, step by step. THis is the method we will call, and it will call the other methods in order.
        pass
    
    def apply_corrections(self, column_name='Crop_type', abs_column='Elevation'):
        self.df[abs_column] = self.df[abs_column].abs()
        self.df[column_name] = self.df[column_name].apply(lambda crop: self.values_to_rename.get(crop, crop))

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods, and applies the changes, step by step. THis is the method we will call, and it will call the other methods in order.
        pass

    def weather_station_mapping(self):
        weather_map_df = read_from_web_CSV(self.weather_map_data)
        self.df = self.df.merge(  weather_map_df, on = "Field_ID", how="left")
        return weather_map_df
    

    def process(self):
        # This process calls the correct methods and applies the changes, step by step. This is the method we will call, and it will call the other methods in order.
        pass

    def process(self):
        self.ingest_sql_data()
        self.rename_columns()
        self.apply_corrections()
        weather_station_df = self.weather_station_mapping()
        
      

In [44]:
field_processor = FieldDataProcessor()
field_processor.process()

field_df = field_processor.df
field_df['Weather_station'].unique()

2024-05-04 11:08:24,755 - data_ingestion - INFO - Database engine created successfully.
2024-05-04 11:08:25,251 - data_ingestion - INFO - Query executed successfully.
2024-05-04 11:08:25,252 - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-05-04 11:08:25,260 - __main__.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
2024-05-04 11:08:28,154 - data_ingestion - INFO - CSV file read successfully from the web.


array([4, 0, 1, 2, 3], dtype=int64)

### Centralising the data pipeline configuration details

some data about the SQL database and web files in the `data_ingestion.py` module, and when we load the `field_data_proccessor.py` module we are referencing it again. 

In [45]:
# From the data_ingestion.py module

db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

weather_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv"
weather_mapping_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"


# From the field_data_processor class
class FieldDataProcessor:
    def __init__(self):
        self.db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'
        self.sql_query = """SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
        self.columns_to_rename = {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}
        self.values_to_rename = {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}
        self.weather_map_data = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"


A good approach is to create a dictionary in our main script that has all of the parameters, and then  reference it in our modules. 

Adding  the configuration details from the `data_ingestion.py` module into the `config_params` dictionary. 

In [47]:
config_params = {
    "sql_query": """SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
        
            """, # Insert your SQL query
    "db_path":'sqlite:///Maji_Ndogo_farm_survey_small.db' , # Insert the db_path of the database
    "columns_to_rename":{'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}, # Insert the disctionary of columns we want to swop the names of, 
    "values_to_rename":{'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'} , # Insert the croptype renaming dictionary
    "weather_csv_path": "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv" , # Insert the weather data CSV here
    "weather_mapping_csv":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv", # Insert the weather data mapping CSV here
}

 removing these lines form the `data_ingestion.py` module file, since one call them from the `FieldDataProcessor` class.

Remove the following lines from the `data_ingestion.py` module:

In [48]:
# Remove these lines from the data_ingestion.py module
db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

weather_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv"
weather_mapping_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"

In the `FieldDataProcessor` class, instead of passing in the parameters as strings,  reference the `config_params` dictionary instead.

 Altering  the attributes of the `FieldDataProcessor` class to reference the `config_params` dictionary instead. Add `config_params` as a parameter to the class instantiation method as shown below and complete the code.

In [49]:


class FieldDataProcessor:

    def __init__(self, config_params, logging_level="INFO"):  # Make sure to add this line, passing in config_params to the class 
        self.db_path = config_params['db_path']
        self.sql_query = config_params["sql_query"]
        self.columns_to_rename = config_params["columns_to_rename"]
        self.values_to_rename = config_params["values_to_rename"]
        self.weather_map_data = config_params["weather_mapping_csv"]

        # Add the rest of your class code here
# Copy in your class including the ingest_sql_data and  method here
        
        self.initialize_logging(logging_level)

        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
        
    # This method enables logging in the class.
    def initialize_logging(self, logging_level):
        """
        Sets up logging for this instance of FieldDataProcessor.
        """
        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.


    # let's focus only on this part from now on
    def ingest_sql_data(self):
        self.engine = create_db_engine(self.db_path)
        self.df = query_data(self.engine, self.sql_query)
        self.logger.info("Sucessfully loaded data.")
        return self.df
       
       

    def rename_columns(self):
        # Annual_yield and Crop_type must be swapped
        pass

    def apply_corrections(self):
        # Correct the crop strings, Eg: 'cassaval' -> 'cassava'
        pass

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods, and applies the changes, step by step. THis is the method we will call, and it will call the other methods in order.
        pass

    
    def rename_columns(self):
        # Extract the columns to rename from the configuration
        column1, column2 = list(self.columns_to_rename.keys())[0], list(self.columns_to_rename.values())[0]
        # Temporarily rename one of the columns to avoid a naming conflict
        temp_name = "__temp_name_for_swap__"
        while temp_name in self.df.columns:
            temp_name += "_"
        # Perform the swap
        self.df = self.df.rename(columns={column1: temp_name, column2: column1})
        self.df = self.df.rename(columns={temp_name: column2})   
        
        self.logger.info(f"Swapped columns: {column1} with {column2}")

                    
    def apply_corrections(self):
        # Correct the crop strings, Eg: 'cassaval' -> 'cassava'
        pass

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods, and applies the changes, step by step. THis is the method we will call, and it will call the other methods in order.
        pass
    
    def apply_corrections(self, column_name='Crop_type', abs_column='Elevation'):
        self.df[abs_column] = self.df[abs_column].abs()
        self.df[column_name] = self.df[column_name].apply(lambda crop: self.values_to_rename.get(crop, crop))

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods, and applies the changes, step by step. THis is the method we will call, and it will call the other methods in order.
        pass

    def weather_station_mapping(self):
        weather_map_df = read_from_web_CSV(self.weather_map_data)
        self.df = self.df.merge(  weather_map_df, on = "Field_ID", how="left")
        return weather_map_df
    

    def process(self):
        # This process calls the correct methods and applies the changes, step by step. This is the method we will call, and it will call the other methods in order.
        pass

    def process(self):
        self.ingest_sql_data()
        self.rename_columns()
        self.apply_corrections()
        weather_station_df = self.weather_station_mapping()
        
              
        


 Instantiate the class with the new dictionary.

In [50]:
config_params = {
    'db_path': 'sqlite:///Maji_Ndogo_farm_survey_small.db',
    'sql_query': """
        SELECT *
        FROM geographic_features
        LEFT JOIN weather_features USING (Field_ID)
        LEFT JOIN soil_and_crop_features USING (Field_ID)
        LEFT JOIN farm_management_features USING (Field_ID)
    """,
    'columns_to_rename': {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'},
    'values_to_rename': {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'},
    'weather_mapping_csv': "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"
}
field_processor = FieldDataProcessor(config_params)
field_processor.process()

field_df = field_processor.df
field_df['Weather_station'].unique()

2024-05-04 11:16:13,409 - data_ingestion - INFO - Database engine created successfully.
2024-05-04 11:16:13,925 - data_ingestion - INFO - Query executed successfully.
2024-05-04 11:16:13,926 - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-05-04 11:16:13,937 - __main__.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
2024-05-04 11:16:15,977 - data_ingestion - INFO - CSV file read successfully from the web.


array([4, 0, 1, 2, 3], dtype=int64)


### Creating `field_data_processor.py`

having a robust data processing class for the field-related data. final step is to create the module file and document the code.

 Completing the `field_data_processor` module. Including  all of the required content, ensuing e the module is PEP 8 complient, including all imports and parameter definitions, and creating the `field_data_processor.py` module file.

Restart the kernel before running this code:

In [51]:
import re # Importing all the packages we will use eventually
import numpy as np
import pandas as pd
from field_data_processor import FieldDataProcessor # Importing our new module
import logging 

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

config_params = {
    'db_path': 'sqlite:///Maji_Ndogo_farm_survey_small.db',
    'sql_query': """
        SELECT *
        FROM geographic_features
        LEFT JOIN weather_features USING (Field_ID)
        LEFT JOIN soil_and_crop_features USING (Field_ID)
        LEFT JOIN farm_management_features USING (Field_ID)
    """,
    'columns_to_rename': {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'},
    'values_to_rename': {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'},
    'weather_mapping_csv': "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"
}
# Paste in your config_params dictionary here


# Instantiating the class with config_params passed to the class as a parameter 
field_processor = FieldDataProcessor(config_params)
field_processor.process()
field_df = field_processor.df

# Test
field_df['Weather_station'].unique()

2024-05-04 11:19:49,778 - data_ingestion - INFO - Database engine created successfully.
2024-05-04 11:19:50,006 - data_ingestion - INFO - Query executed successfully.
2024-05-04 11:19:50,009 - field_data_processor.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-05-04 11:19:50,016 - field_data_processor.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
2024-05-04 11:19:51,385 - data_ingestion - INFO - CSV file read successfully from the web.


array([4, 0, 1, 2, 3], dtype=int64)

## Weather data processor

 last module. The `WeatherDataProcessor` class will be dealing with all of the weather-related data.  to instantiate the class, then call a `.process()` method to import and clean the data. Here is the code we used last time:

In [None]:
def extract_measurement(message):
    import re # Importing the regex pattern
    import numpy as np
    
    weather_station_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv")
    weather_station_mapping_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv")
    
    patterns = {
        'Rainfall': r'(\d+(\.\d+)?)\s?mm',
         'Temperature': r'(\d+(\.\d+)?)\s?C',
        'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'
        }
    """
    Extracts a numeric measurement value from a given message string.

    The function applies regular expressions to identify and extract
    numeric values related to different types of measurements such as
    Rainfall, Average Temperatures, and Pollution Levels from a text message.
    It returns the key of the matching record, and first matching value as a floating-point number.
    
    Parameters:
    message (str): A string message containing the measurement information.

    Returns:
    float: The extracted numeric value of the measurement if a match is found;
           otherwise, None.

    The function uses the following patterns for extraction:
    - Rainfall: Matches numbers (including decimal) followed by 'mm', optionally spaced.
    - Ave_temps: Matches numbers (including decimal) followed by 'C', optionally spaced.
    - Pollution_level: Matches numbers (including decimal) following 'Pollution at' or '='.
    
    Example usage:
    extract_measurement("【2022-01-04 21:47:48】温度感应: 现在温度是 12.82C.")
    # Returns: 'Temperature', 12.82
    """
    
    for key, pattern in patterns.items(): # Loop through all of the patterns and check if it matches the pattern value.
        match = re.search(pattern, message)
        if match:
            # Extract the first group that matches, which should be the measurement value if all previous matches are empty.
            # print(match.groups()) # Uncomment this line to help you debug your regex patterns.
            return key, float(next((x for x in match.groups() if x is not None)))
    
    return None, None

# The function creates a tuple with the measurement type and value into a Pandas Series
result = weather_station_df['Message'].apply(extract_measurement)

# Create separate columns for 'Measurement' and 'extracted_value' by unpacking the tuple with Lambda functions.
weather_station_df['Measurement'] = result.apply(lambda x: x[0])
weather_station_df['Value'] = result.apply(lambda x: x[1])

# The function creates a tuple with the measurement type and value into a Pandas Series
result = weather_station_df['Message'].apply(extract_measurement)

# Create separate columns for 'Measurement' and 'extracted_value' by unpacking the tuple with Lambda functions.
weather_station_df['Measurement'] = result.apply(lambda x: x[0])
weather_station_df['Value'] = result.apply(lambda x: x[1])


Luckily the other team did most of the work this time, so we only have to fill in a couple of details. First off, we need to add more keys to the `config_params` dictionary. Specifically the regex pattern we used to get the messages, and the URL of the weather data.

<br>

⚙️ **Task:** Complete the values for the new keys in `config_params`.

In [None]:
weather_station_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv")
weather_station_mapping_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv")

patterns = {
    'Rainfall': r'(\d+(\.\d+)?)\s?mm',
     'Temperature': r'(\d+(\.\d+)?)\s?C',
    'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'
    }

config_params ={
    "sql_query": """
            SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
            , # Insert your SQL query
    "db_path": 'sqlite:///Maji_Ndogo_farm_survey_small.db', # Insert the db_path of the database
    "columns_to_rename": {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'},# Insert the disctionary of columns we want to swop the names of, 
    "values_to_rename": {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}, # Insert the croptype renaming dictionary
    "weather_csv_path": "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the weather data CSV here
    "weather_mapping_csv":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv", # Insert the weather data mapping CSV here


    # Paste in your previous dictionary data in here

    # Add two new keys
   "weather_csv_path":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv",
# Insert the URL for the weather station data
    "regex_patterns" : {
                        'Rainfall': r'(\d+(\.\d+)?)\s?mm',
                         'Temperature': r'(\d+(\.\d+)?)\s?C',
                        'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'} ,

}

The class fully set up. one has to make sure the formatting is correct, and that the module is documented properly.

Complete the `weather_data_processor` module. Include all of the required content, ensure the module is PEP 8 compliant, include all imports and parameter definitions, and create the `weather_data_processor.py` module file.

In [None]:
# These are the imports we're going to use in the weather data processing module
import re
import numpy as np
import pandas as pd
import logging
from data_ingestion import read_from_web_CSV

In [None]:
### START FUNCTION 
class WeatherDataProcessor:
    import re
    import numpy as np
    import pandas as pd
    import logging
    from data_ingestion import read_from_web_CSV
    def __init__(self, config_params, logging_level="INFO"): # Now we're passing in the confi_params dictionary already
        self.weather_station_data = config_params['weather_csv_path']
        self.patterns = config_params['regex_patterns']
        self.weather_df = None  # Initialize weather_df as None or as an empty DataFrame
        self.initialize_logging(logging_level)

    def initialize_logging(self, logging_level):
        logger_name = __name__ + ".WeatherDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

    def weather_station_mapping(self):
        self.weather_df = read_from_web_CSV(self.weather_station_data)
        self.logger.info("Successfully loaded weather station data from the web.") 
        # Here, you can apply any initial transformations to self.weather_df if necessary.

    
    def extract_measurement(self, message):
        for key, pattern in self.patterns.items():
            match = re.search(pattern, message)
            if match:
                self.logger.debug(f"Measurement extracted: {key}")
                return key, float(next((x for x in match.groups() if x is not None)))
        self.logger.debug("No measurement match found.")
        return None, None

    def process_messages(self):
        if self.weather_df is not None:
            result = self.weather_df['Message'].apply(self.extract_measurement)
            self.weather_df['Measurement'], self.weather_df['Value'] = zip(*result)
            self.logger.info("Messages processed and measurements extracted.")
        else:
            self.logger.warning("weather_df is not initialized, skipping message processing.")
        return self.weather_df

    def calculate_means(self):
        if self.weather_df is not None:
            means = self.weather_df.groupby(by=['Weather_station_ID', 'Measurement'])['Value'].mean()
            self.logger.info("Mean values calculated.")
            return means.unstack()
        else:
            self.logger.warning("weather_df is not initialized, cannot calculate means.")
            return None
    
    def process(self):
        self.weather_station_mapping()  # Load and assign data to weather_df
        self.process_messages()  # Process messages to extract measurements
        self.logger.info("Data processing completed.")
### END FUNCTION

Once we have the `weather_data_processor` module set up, one can run the code below to import the new module, and make sure our module worked correctly.

In [None]:
import re
import numpy as np
import pandas as pd
# from field_data_processor import FieldDataProcessor
from weather_data_processor import WeatherDataProcessor
import logging 

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

config_params = {
    "sql_query": """
            SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
            , # Insert your SQL query
    "db_path": 'sqlite:///Maji_Ndogo_farm_survey_small.db', # Insert the db_path of the database
    "columns_to_rename": {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'},# Insert the disctionary of columns we want to swop the names of, 
    "values_to_rename": {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}, # Insert the croptype renaming dictionary
    "weather_csv_path": "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the weather data CSV here
    "weather_mapping_csv":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv", # Insert the weather data mapping CSV here


    # Paste in your previous dictionary data in here

    # Add two new keys
   "weather_csv_path":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv",
# Insert the URL for the weather station data
    "regex_patterns" : {
                        'Rainfall': r'(\d+(\.\d+)?)\s?mm',
                         'Temperature': r'(\d+(\.\d+)?)\s?C',
                        'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'} ,

}
# Ignoring the field data for now.
# field_processor = FieldDataProcessor(config_params)
# field_processor.process()
# field_df = field_processor.df

weather_processor = WeatherDataProcessor(config_params)
weather_processor.process()
weather_df = weather_processor.weather_df

weather_df['Measurement'].unique()

### Validating our data pipeline

 finally having working modules that now automatically pull data from the database  (or the web), process it, clean it, and return our starting DataFrame.
 use your `config_params` dictionary.

In [None]:
import re
import numpy as np
import pandas as pd
from field_data_processor import FieldDataProcessor
from weather_data_processor import WeatherDataProcessor
import logging 

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

config_params ={
    "sql_query": """
            SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
            , # Insert your SQL query
    "db_path": 'sqlite:///Maji_Ndogo_farm_survey_small.db', # Insert the db_path of the database
    "columns_to_rename": {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'},# Insert the disctionary of columns we want to swop the names of, 
    "values_to_rename": {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}, # Insert the croptype renaming dictionary
    "weather_csv_path": "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the weather data CSV here
    "weather_mapping_csv":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv", # Insert the weather data mapping CSV here


    # Paste in your previous dictionary data in here

    # Add two new keys
   "weather_csv_path":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv",
# Insert the URL for the weather station data
    "regex_patterns" : {
                        'Rainfall': r'(\d+(\.\d+)?)\s?mm',
                         'Temperature': r'(\d+(\.\d+)?)\s?C',
                        'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'} ,

} # Paste in your config_params dictionary here

field_processor = FieldDataProcessor(config_params)
field_processor.process()
field_df = field_processor.df

weather_processor = WeatherDataProcessor(config_params)
weather_processor.process()
weather_df = weather_processor.weather_df

There should be a `validate_data.py` file in the notebook directory. This is a `pytest` script that does a couple of tests to see if the data we're expecting, is what we actually have. Have a look at the test script, and try to understand what we're testing.

`pytest` normally runs from the command line because it is set up to be automated. To test the data, we have to give `pytest` access to that data. The simplest way to do this is by creating CSV files, importing them into `validate_data.py`, and running the tests.

The following code creates CSV files, runs `pytest` in the terminal using `!pytest validate_data.py -v`, and deletes the CSV files once the test is complete.

In [None]:
# !pip install pytest
!pip install pytest
weather_df.to_csv('sampled_weather_df.csv', index=False)
field_df.to_csv('sampled_field_df.csv', index=False)

!pytest validate_data.py -v

import os# Define the file paths
weather_csv_path = 'sampled_weather_df.csv'
field_csv_path = 'sampled_field_df.csv'

# Delete sampled_weather_df.csv if it exists
if os.path.exists(weather_csv_path):
    os.remove(weather_csv_path)
    print(f"Deleted {weather_csv_path}")
else:
    print(f"{weather_csv_path} does not exist.")

# Delete sampled_field_df.csv if it exists
if os.path.exists(field_csv_path):
    os.remove(field_csv_path)
    print(f"Deleted {field_csv_path}")
else:
    print(f"{field_csv_path} does not exist.")

# Validating the dataset

Before the  the analysis part, its good to notice how much simpler this data import is now. 

In [None]:
import re
import numpy as np
import pandas as pd
from field_data_processor import FieldDataProcessor
from weather_data_processor import WeatherDataProcessor
import logging 

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

config_params ={
    "sql_query": """
            SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
            , # Insert your SQL query
    "db_path": 'sqlite:///Maji_Ndogo_farm_survey_small.db', # Insert the db_path of the database
    "columns_to_rename": {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'},# Insert the disctionary of columns we want to swop the names of, 
    "values_to_rename": {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}, # Insert the croptype renaming dictionary
    "weather_csv_path": "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the weather data CSV here
    "weather_mapping_csv":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv", # Insert the weather data mapping CSV here


    # Paste in your previous dictionary data in here

    # Add two new keys
   "weather_csv_path":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv",
# Insert the URL for the weather station data
    "regex_patterns" : {
                        'Rainfall': r'(\d+(\.\d+)?)\s?mm',
                         'Temperature': r'(\d+(\.\d+)?)\s?C',
                        'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'} ,

}   # Paste in your previous dictionary data in here

field_processor = FieldDataProcessor(config_params)
field_processor.process()
field_df = field_processor.df

weather_processor = WeatherDataProcessor(config_params)
weather_processor.process()
weather_df = weather_processor.weather_df

# Rename 'Ave_temps' in field_df to 'Temperature' to match weather_df
field_df.rename(columns={'Ave_temps': 'Temperature'}, inplace=True)


Back to the initial plan:

1. Create a null hypothesis.
1. Import the `MD_agric_df` dataset and clean it up.
1. Import the weather data.
1. Map the weather data to the field data.
1. Calculate the means of the weather station dataset and the means of the main dataset.
2. Calculate all the parameters we need to do a t-test. 
3. Interpret our results.

## Hypothesis
###  this part ....

Knowing  if our field data is representing the reality in Maji Ndogo by looking at an independent set of data. If field data (means) are the same as the weather data (means), then it indicates no significant difference between the datasets. 

<br>

Given a significance level $\alpha$ of 0.05 for a two-tailed test, hypothesis test at a 95% confidence interval:

- $H_0$: There is no significant difference between the means of the two datasets. This is expressed as $\mu_{field} = \mu_{weather}$.

- $H_a$: There is a significant difference between the means of the two datasets. This is expressed as $\mu_{field} \neq \mu_{weather}$.

<br>

If the p-value obtained from the test:
- is less than or equal to the significance level, so $p \leq \alpha$, we reject the null hypothesis.
- is larger than the significance level, so $p > \alpha$, we cannot reject the null hypothesis, as we cannot find a statistically significant difference between the datasets at the 95% confidence level.



First,  to import all of the packages and define a few variables.  importing a new method, `.ttest_ind()`. This method takes in two data columns and calculates means, variance, and returns the the t- and p-statistics. So t-test is reduced to one line. Since the alternative hypothesis does not make a claim of greater or less than, we will use the two-sided t-test, by adding the `alternative = 'two-sided'` keyword.

In [None]:
from scipy.stats import ttest_ind
import numpy as np

# Now, the measurements_to_compare can directly use 'Temperature', 'Rainfall', and 'Pollution_level'
measurements_to_compare = ['Temperature', 'Rainfall', 'Pollution_level']

 compare the means of the temperature, rainfall, and pollution data, for fields assigned to a specific weather station. So for both datasets, one need to isolate the measurement type and weather station for each data, and  comparing the correct means.

1. filtering both `field_df` and `weather_df` based on the given station ID and measurement using `filter_field_data(df, station_id, measurement)` and `filter_weather_data(df, station_id, measurement)`.  
2. We need to performing  a t-test to conduct the t-test on the filtered data. using `ttest_ind(data_col1, data_col2, equal_var=False)` from `scipy.stats`.
3. `print_ttest_results(station_id, measurement, p_val, alpha)` to interpret and print the results from the t-test.

defining  these functions, focusing on `Temperature` for `station ID = 0`. Then, integrate these functions into a loop that iterates over each station ID and measurement type.

Creating a `filter_field_data` function that takes in the `field_df` DataFrame, the `station_id`, and `measurement` type, and returns a single column (series) of data filtered by the `station_id`, and `measurement`.

In [None]:

def filter_field_data(df, station_id, measurement):
    from scipy.stats import ttest_ind
    import numpy as np

# Now, the measurements_to_compare can directly use 'Temperature', 'Rainfall', and 'Pollution_level'
    measurements_to_compare = ['Temperature', 'Rainfall', 'Pollution_level']
    
    
    return df[(df['Weather_station'] == station_id) & (df[measurement].notnull())][measurement] 
    


In [None]:
# Example for station ID 0 and Temperature
station_id = 0
alpha = 0.05
measurement = 'Temperature'

# Filter data for the specific station and measurement
field_values = filter_field_data(field_df, station_id, measurement)
field_values

In [57]:
# Example for station ID 0 and Temperature
station_id = 0
alpha = 0.05
measurement = 'Temperature'

# Filter data for the specific station and measurement
field_values = filter_field_data(field_df, station_id, measurement)
print(f"Shape: {field_values.shape}, First value: {field_values.iloc[0]} ")

Shape: (1375,), First value: 13.35 


`Shape: (1375,), First value: 13.35 `

a data filter function that takes in the `weather_df` DataFrame, the `station_id`, and `measurement` type, and returns a **single column** (series) of data filtered by the `station_id`, and `measurement`.

In [None]:

def filter_weather_data(df, station_id, measurement):
     """
    Filter weather data DataFrame to include only rows where 'Weather_station_ID' is equal to station_id,
    and 'Measurement' is equal to measurement, then return the 'Value' column.

    Parameters:
    - df (DataFrame): The input DataFrame containing weather data.
    - station_id (str): The ID of the weather station to filter by.
    - measurement (str): The name of the measurement to filter by."""

     return df[(df['Weather_station_ID'] == station_id) & (df['Measurement'] == measurement)]['Value']



In [None]:
# Example for station ID 0 and Temperature
station_id = 0
alpha = 0.05
measurement = 'Temperature'

# Filter data for the specific station and measurement

weather_values = filter_weather_data(weather_df, station_id, measurement)
weather_values


In [None]:
# Example for station ID 0 and Temperature
station_id = 0
alpha = 0.05
measurement = 'Temperature'

# Filter data for the specific station and measurement

weather_values = filter_weather_data(weather_df, station_id, measurement)

print(f"Shape: {weather_values.shape}, First value: {weather_values.iloc[0]}")

 a function that calculates the t-statistic and p-value. The function should accept two **single columns** of data and return a tuple of the t-statistic and p-value.

In [None]:


def run_ttest(Column_A, Column_B):
    from scipy.stats import ttest_ind
    t_statistic, p_value = ttest_ind(Column_A, Column_B, equal_var=False)
    return t_statistic, p_value  


In [None]:
# Example for station ID 0 and Temperature
station_id = 0
alpha = 0.05
measurement = 'Temperature'

# Filter data for the specific station and measurement
field_values = filter_field_data(field_df, station_id, measurement)
weather_values = filter_weather_data(weather_df, station_id, measurement)

# Perform t-test
t_stat, p_val = run_ttest(field_values, weather_values)
print(f"T-stat: {t_stat:.5f}, p-value: {p_val:.5f}")

codeto  print out the t-test result.

In [None]:

def print_ttest_results(station_id, measurement, p_val, alpha):
    """
    Interprets and prints the results of a t-test based on the p-value.
    """
    if p_val < alpha:
        print(f"   Significant difference in {measurement} detected at Station  {station_id}, (P-Value: {p_val:.5f} < {alpha}). Null hypothesis rejected.")
    else:
        print(f"   No significant difference in {measurement} detected at Station  {station_id}, (P-Value: {p_val:.5f} > {alpha}). Null hypothesis not rejected.")



**Input:**

In [None]:
# Example for station ID 0 and Temperature
station_id = 0

measurement = 'Temperature'

# Filter data for the specific station and measurement
field_values = filter_field_data(field_df, station_id, measurement)
weather_values = filter_weather_data(weather_df, station_id, measurement)

# Perform t-test
t_stat, p_val = run_ttest(field_values, weather_values)
print_ttest_results(station_id, measurement, p_val, alpha)

a function that loops over `measurements_to_compare` and all `station_id`, performs a t-test and print the results. The function should accept `field_df`, `weather_df`, `list_measurements_to_compare`, `alpha`. the value of `alpha` should default to a value of 0.05

In [None]:

def hypothesis_results(field_df, weather_df, list_measurements_to_compare, alpha=0.05):
    """
    Perform hypothesis testing for each weather station and measurement.

    Args:
        field_df (pandas.DataFrame): DataFrame containing field data.
        weather_df (pandas.DataFrame): DataFrame containing weather data.
        list_measurements_to_compare (list): List of measurements to compare.
        alpha (float, optional): The significance level for hypothesis testing. Defaults to 0.05.

    Returns:
        None: Prints the results of the hypothesis testing.
    """
    station_ids = sorted(field_df['Weather_station'].unique())
    for measurement in list_measurements_to_compare:
        for station_id in station_ids:
            field_values = filter_field_data(field_df, station_id, measurement)
            weather_values = filter_weather_data(weather_df, station_id, measurement)
            t_stat, p_val = run_ttest(field_values, weather_values)
            print_ttest_results(station_id, measurement, p_val, alpha)


In [None]:
alpha = 0.05
hypothesis_results(field_df, weather_df, measurements_to_compare, alpha)

**Expected outcome:**
```python 
   No significant difference in Temperature detected at Station 0, (P-Value: 0.90761 > 0.05). Null hypothesis not rejected.
   No significant difference in Rainfall detected at Station 0, (P-Value: 0.21621 > 0.05). Null hypothesis not rejected.
   No significant difference in Pollution_level detected at Station 0, (P-Value: 0.56418 > 0.05). Null hypothesis not rejected.
   No significant difference in Temperature detected at Station 1, (P-Value: 0.47241 > 0.05). Null hypothesis not rejected.
   No significant difference in Rainfall detected at Station 1, (P-Value: 0.54499 > 0.05). Null hypothesis not rejected.
   No significant difference in Pollution_level detected at Station 1, (P-Value: 0.24410 > 0.05). Null hypothesis not rejected.
   No significant difference in Temperature detected at Station 2, (P-Value: 0.88671 > 0.05). Null hypothesis not rejected.
   No significant difference in Rainfall detected at Station 2, (P-Value: 0.36466 > 0.05). Null hypothesis not rejected.
   No significant difference in Pollution_level detected at Station 2, (P-Value: 0.99388 > 0.05). Null hypothesis not rejected.
   No significant difference in Temperature detected at Station 3, (P-Value: 0.66445 > 0.05). Null hypothesis not rejected.
   No significant difference in Rainfall detected at Station 3, (P-Value: 0.39847 > 0.05). Null hypothesis not rejected.
   No significant difference in Pollution_level detected at Station 3, (P-Value: 0.15466 > 0.05). Null hypothesis not rejected.
   No significant difference in Temperature detected at Station 4, (P-Value: 0.88575 > 0.05). Null hypothesis not rejected.
   No significant difference in Rainfall detected at Station 4, (P-Value: 0.33237 > 0.05). Null hypothesis not rejected.
   No significant difference in Pollution_level detected at Station 4, (P-Value: 0.21508 > 0.05). Null hypothesis not rejected.
   ```

 This means no evidence to suggest that the weather data is different from the field data. This makes it  confident that our field data, at least in terms of temperature, rainfall, and pollution level is reflecting the reality. 

