In [1]:
#Uncommoent the lines to install the modules
# these modules if not found in your system
"""
!pip3 install mysql-connector-python
!pip3 install numpy
!pip3 install pandas
"""

'\n!pip3 install mysql-connector-python\n!pip3 install numpy\n!pip3 install pandas\n'

In [2]:
#Library used in the project
import mysql.connector
from mysql.connector import Error
import numpy as np
import pandas as pd
import os
import logging
import datetime
import warnings
warnings.filterwarnings("ignore")

In [3]:
# Starting Logging 
logger = logging.getLogger(__name__)
logging.basicConfig(filename='weather_data.log', level=logging.INFO)
logger.info('Log Started: Weather Project')

In [36]:
# Connecting with MySQL 
logger.info('\t SQL Connection Initiated')

try:
    conn = mysql.connector.connect(host='localhost',
                                         database='weatherDB',
                                         user='varun',
                                         password='12345')

    mycursor = conn.cursor()
    if conn.is_connected():
        print(f"Connected to MySQL Server version: {conn.get_server_info()}")
        mycursor.execute("select database();")
        record = mycursor.fetchone()
        print(f"You're connected to database: {record[0]}")
    logger.info('\t SQL Connnection was successful for weather data.')
except Error as e:
    print("Error while connecting to MySQL", e)
    logger.info("Error while connecting to MySQL" + e)

Connected to MySQL Server version: 8.2.0
You're connected to database: weatherdb


## Problem 1 : Data Modeling

I have built 'weatherDB' Schema in SQL and executed the script that created the schema and the weather_records table in which we will save the data.

## Problem 2 : Ingestion

In [5]:
logger.info('Data Ingestion: started in Weather Project')

In [6]:
folderName = 'wx_data'
folderFound = True

In [7]:
if not os.path.exists(folderName):
    logger.info(f'Error: {folderName} folder doesn\'t exists in current working directory.')
    print(f'Error: {folderName} folder doesn\'t exists in current working directory.')
    folderFound = False

In [8]:
if folderFound:
    
    logger.info('\t' + str(datetime.datetime.now()) + ' : Starting Ingestion into MySQL from wx_data')
    insertQuery = "INSERT INTO weather_records (record_fid, record_date, record_maxtemp, record_mintemp, record_precipitation) VALUES (%s, %s, %s, %s, %s)"
    
    # to count number of records inserted  into weather_records table
    fileCount = 0
    
    # Reading file one by one and inserting the data into mySQL
    for filename in os.listdir(folderName):
        fid = filename.split('.')[0]
        with open(os.path.join(folderName,filename),'r') as f:
            for line in f:
                data = line.strip().split('\t')
                data = [ temp.lstrip() for temp in data ]
                query = f"select * from weather_records where record_fid = '{fid}' and record_date = '{data[0]}'"
                mycursor.execute(query)
                if len(mycursor.fetchall()) == 0 :
                    val = (fid, data[0], data[1], data[2], data[3])
                    mycursor.execute(insertQuery, val)
                    conn.commit()
                    fileCount += 1
    logger.info(f'\tNumber of records ingested in weather_records table : {fileCount}')
    logger.info('\t' + str(datetime.datetime.now()) + ' : Stopping Ingestion into MySQL from wx_data')

## Problem 3 : Data Analysis

In [9]:
logger.info('Data Analysis Started.')

### => Extract data from MySQL Data


In [45]:
# Build dataframe from MySQL data
logger.info('\t' + 'Building dataframe by reading data from MySQL')
query = 'select * from weather_records'
df = pd.read_sql(query, con = conn)
df.head()

Unnamed: 0,record_fid,record_date,record_maxtemp,record_mintemp,record_precipitation
0,USC00110072,1985-01-01,-22,-128,94
1,USC00110072,1985-01-02,-122,-217,0
2,USC00110072,1985-01-03,-106,-244,0
3,USC00110072,1985-01-04,-56,-189,0
4,USC00110072,1985-01-05,11,-78,0


### => copy data to create new dataframe and create new column with year from date column

In [46]:
newdf = df.copy()
logger.info('\t' + 'Copying into new dataframe.')

newdf['record_date'] = newdf['record_date'].astype(str)
temp = newdf['record_date'].str.split("-", n=1, expand=True)
newdf['year'] = temp[0]
newdf['year'] = newdf['year'].astype(int)
logger.info('\t' + "Adding anothor column 'year' into newdf")

newdf.drop(['record_date'],axis=1,inplace=True)
logger.info('\t' + 'removing record_date column from newdf')

newdf.head()

Unnamed: 0,record_fid,record_maxtemp,record_mintemp,record_precipitation,year
0,USC00110072,-22,-128,94,1985
1,USC00110072,-122,-217,0,1985
2,USC00110072,-106,-244,0,1985
3,USC00110072,-56,-189,0,1985
4,USC00110072,11,-78,0,1985


### Replacing -9999 to np.nan in the dataframe

In [47]:
newdf.replace({-9999: np.nan},inplace=True)

In [54]:
insertQuery = "INSERT INTO station_results (result_fid, result_year, result_avg_maxtemp, result_avg_mintemp, result_total_precipitation ) VALUES (%s, %s, %s, %s, %s)"
logger.info('\t' + 'Inserting result into station_records table.')

# from dataframe reading each stations code one by one
for station in newdf.record_fid.unique():

    print('')
    print("*"*100)
    print('\t{:15} {:8} {:13} {:13} {:18}' .format("Station","Year","AvgMaxTemp","AvgMinTemp","Total Accumulated Precipitation"))
    print("*"*100)
    # Generating year from 1985 to 2014    
    for year in range(1985, 2015):

        avgMaxTemp = newdf[ (newdf.record_fid == station ) & (newdf.year == year)].record_maxtemp.mean(skipna=True)
        avgMinTemp = newdf[ (newdf.record_fid == station ) & (newdf.year == year)].record_mintemp.mean(skipna=True)
        sumPrecipitation = newdf[(newdf.record_fid == station ) & (newdf.year == year)].record_precipitation.sum(skipna=True)

        avgMaxTempStr = f'{avgMaxTemp:0.3f}'
        avgMinTempStr = f'{avgMinTemp:0.3f}'
        sumPrecipitationStr = f'{sumPrecipitation:0.3f}'

        print('\t{:15} {:4} {:>13} {:>13} {:>18}' .format(station,year,avgMaxTempStr,avgMinTempStr,sumPrecipitation))

        if avgMaxTemp == np.nan:
            avgMaxTempStr = 'NULL'
        if avgMinTemp == np.nan:
            avgMinTemp = 'NULL'
        if sumPrecipitationStr == np.nan:
            sumPrecipitationStr ='NULL'
            
        query = f"select * from station_results where result_fid = '{station}' and result_year = '{year}'"
        mycursor.execute(query)
        if len(mycursor.fetchall()) == 0 :
            val = (station, year, avgMaxTempStr, avgMinTempStr, sumPrecipitationStr)
            mycursor.execute(insertQuery, val)
            conn.commit()


****************************************************************************************************
	Station         Year     AvgMaxTemp    AvgMinTemp    Total Accumulated Precipitation
****************************************************************************************************
	USC00110072     1985       153.348        43.264             7801.0
	USC00110072     1986       126.963        21.762             5053.0
	USC00110072     1987       177.603        63.299             7936.0
	USC00110072     1988       173.473        45.350             5410.0
	USC00110072     1989       156.515        39.836             7937.0
	USC00110072     1990       169.003        59.603            11694.0
	USC00110072     1991       164.160        52.613             7906.0
	USC00110072     1992       149.917        50.378             9326.0
	USC00110072     1993       139.597        47.453            14207.0
	USC00110072     1994       149.610        37.894             6688.0
	USC00110072     1995

In [55]:
### Reading results from station_results table
# Build dataframe from MySQL data
logger.info('\t' + 'Reading results from MySQL station_results table')
query = 'select * from station_results'
rdf = pd.read_sql(query, con = conn)
print(f'Number of recors in station_results table : {rdf.shape[0]}')
rdf.head()

Number of recors in station_results table : 5010


Unnamed: 0,result_fid,result_year,result_avg_maxtemp,result_avg_mintemp,result_total_precipitation
0,USC00110072,1985,153.348,43.264,7801.0
1,USC00110072,1986,126.963,21.762,5053.0
2,USC00110072,1987,177.603,63.299,7936.0
3,USC00110072,1988,173.473,45.35,5410.0
4,USC00110072,1989,156.515,39.836,7937.0


In [None]:
## Problem 4 : Rest API

In [None]:
# Closing MySQL Connection
conn.close()