## Setup

### Install dependencies

In [None]:
%%writefile README.md
Overview

--------

This coding exercise will help us understand how you approach some of the common problems we see in data engineering. Ask questions if things are unclear, use best practices and common software patterns, and feel free to go the extra mile to show off your skills. Imagine you are handing off your completed project to someone else to maintain -- it should be clear to another developer how things work and your reasoning behind your design decisions.

You will be asked to ingest some weather and crop yield data (provided), design a database schema for it, and expose the data through a REST API. You may use whatever software tools you would like to answer the problems below, but keep in mind the skills required for the position you are applying for and how best to demonstrate them. Read through all the problems before beginning, as later problems may inform your approach to earlier problems.

You can retrieve the data required for this exercise by cloning this repository:

https://github.com/corteva/code-challenge-template

Weather Data Description

------------------------

The wx\_data directory has files containing weather data records from 1985-01-01 to 2014-12-31. Each file corresponds to a particular weather station from Nebraska, Iowa, Illinois, Indiana, or Ohio.

Each line in the file contains 4 records separated by tabs:

1. The date (YYYYMMDD format)

2. The maximum temperature for that day (in tenths of a degree Celsius)

3. The minimum temperature for that day (in tenths of a degree Celsius)

4. The amount of precipitation for that day (in tenths of a millimeter)

Missing values are indicated by the value -9999.

Problem 1 - Data Modeling

-------------------------

Choose a database to use for this coding exercise (SQLite, Postgres, etc.). Design a data model to represent the weather data records. If you use an ORM, your answer should be in the form of that ORM's data definition format. If you use pure SQL, your answer should be in the form of DDL statements.

Problem 2 - Ingestion

---------------------

Write code to ingest the weather data from the raw text files supplied into your database, using the model you designed. Check for duplicates: if your code is run twice, you should not end up with multiple rows with the same data in your database. Your code should also produce log output indicating start and end times and number of records ingested.

Problem 3 - Data Analysis

-------------------------

For every year, for every weather station, calculate:

\* Average maximum temperature (in degrees Celsius)

\* Average minimum temperature (in degrees Celsius)

\* Total accumulated precipitation (in centimeters)

Ignore missing data when calculating these statistics.

Design a new data model to store the results. Use NULL for statistics that cannot be calculated.

Your answer should include the new model definition as well as the code used to calculate the new values and store them in the database.

Problem 4 - REST API

--------------------

Choose a web framework (e.g. Flask, Django REST Framework). Create a REST API with the following GET endpoints:

/api/weather

/api/weather/stats

Both endpoints should return a JSON-formatted response with a representation of the ingested/calculated data in your database. Allow clients to filter the response by date and station ID (where present) using the query string. Data should be paginated.

Include a Swagger/OpenAPI endpoint that provides automatic documentation of your API.

Your answer should include all files necessary to run your API locally, along with any unit tests.

Extra Credit - Deployment

-------------------------

(Optional.) Assume you are asked to get your code running in the cloud using AWS. What tools and AWS services would you use to deploy the API, database, and a scheduled version of your data ingestion code? Write up a description of your approach.

---

Submitting Your Answers

-----------------------

Consider using a linter, code formatter, and including tests and code comments. Anything that helps us understand your thought process is helpful!

Please provide us with a link to your Git repository, hosted on GitHub/GitLab, containing your answers and code.

Overwriting README.md


In [None]:
%%writefile requirements.txt
requests
flask 

Overwriting requirements.txt


In [None]:
!pip install -r requirements.txt

### Data Download

In [None]:
!mkdir submission
%cd submission
!git clone https://github.com/corteva/code-challenge-template
!mv code-challenge-template/wx_data data
!rm -r code-challenge-template

### Imports

In [None]:
import sqlite3
import os
import time

### Parameters

In [None]:
# Path to the directory containing the weather data files
DATA_DIR = "./data"

# Path to the sqlite database
DATABASE_PATH = 'weather.db'

### Functions

In [None]:
def execute_sql(conn, query):
    """function that executes the given sql query in provided connection database
    Args:
        conn (sqlite3.Connection): Connection object
        query (str): SQL query to be executed
    Returns:
        None
    Raise:
        NA
    """
    # create a cursor to the connection
    c = conn.cursor()

    # Execute the query
    c.execute(query)

    # Save (commit) the changes
    conn.commit()

## Data Modeling

In [None]:
# create a new SQLite database and connecting to it
conn = sqlite3.connect(DATABASE_PATH)

# DDL to create the table
query = '''
CREATE TABLE weather_data (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    station_id INTEGER NOT NULL,
    date INTEGER NOT NULL,
    max_temp REAL NOT NULL,
    min_temp REAL NOT NULL,
    precipitation REAL NOT NULL
)
'''

# execute the query
execute_sql(conn, query)

# We can also close the connection if we are done with it.
# Just be sure any changes have been committed or they will be lost.
conn.close()

## Ingestion

In [None]:
# Connect to the database
conn = sqlite3.connect(DATABASE_PATH)

# start time of the data ingestion process
start_time = time.time()

# Iterate through the data files and insert data into the database
for filename in os.listdir(DATA_DIR):
    if filename.startswith("USC"):
        station_id = filename[3:6]
        with open(os.path.join(DATA_DIR, filename), "r") as f:
            for line in f:
                date, max_temp, min_temp, precipitation = line.strip().split("\t")
                # Skip lines with missing data
                if max_temp == "-9999" or min_temp == "-9999" or precipitation == "-9999":
                    continue
                insert_sql = """
                INSERT OR IGNORE INTO weather_data
                (station_id, date, max_temp, min_temp, precipitation)
                VALUES (?, ?, ?, ?, ?)
                """
                conn.execute(insert_sql, (int(station_id), int(date), int(max_temp)/10, int(min_temp)/10, int(precipitation)/10))
conn.commit()

# Print the number of records ingested
select_count_sql = "SELECT COUNT(*) FROM weather_data"
print(f"Number of records ingested: {conn.execute(select_count_sql).fetchone()[0]}")

# Close the connection
conn.close()
end_time = time.time()
print(f'Records were ingested in {end_time - start_time:.2f} seconds')

Number of records ingested: 1669107
Records were ingested in 10.23 seconds


## Data Analysis

In [None]:
# create a new SQLite database and connecting to it
conn = sqlite3.connect(DATABASE_PATH)

# sql query for data analysis
query_create_table = '''
CREATE TABLE weather_data_analysis (
    year INTEGER NOT NULL,
    station_id INTEGER NOT NULL,
    avg_max_temp REAL,
    avg_min_temp REAL,
    total_precipitation REAL,
    PRIMARY KEY (year, station_id)
);
'''

query_insert = '''
INSERT INTO weather_data_analysis
SELECT
    substr(date, 1, 4) AS year,
    station_id,
    AVG(CASE WHEN max_temp != -9999 THEN max_temp/10.0 ELSE NULL END) AS avg_max_temp,
    AVG(CASE WHEN min_temp != -9999 THEN min_temp/10.0 ELSE NULL END) AS avg_min_temp,
    SUM(CASE WHEN precipitation != -9999 THEN precipitation/10.0 ELSE NULL END) AS total_precipitation
FROM weather_data
GROUP BY year, station_id;
'''

# Execute the queries
execute_sql(conn, query_create_table)
execute_sql(conn, query_insert)

# Close the connection
conn.close()

## REST API

In [None]:
%%writefile app.py
from flask import Flask, request, jsonify
import sqlite3


app = Flask(__name__)

@app.route('/api')
def hello_world():
    return 'API for weather data!'


conn = sqlite3.connect('weather.db', check_same_thread=False)


@app.route('/api/weather', methods=['GET'])
def get_weather():

    # get query parameters
    station_id = request.args.get('station_id')
    date = request.args.get('date')

    # get page parameters
    page = request.args.get('page', default=1, type=int)
    per_page = 10
    offset = (page - 1) * per_page
    
    # build the query
    query = 'SELECT * FROM weather_data'
    if station_id:
        query += f' WHERE station_id = {station_id}'
    if date:
        query += f' WHERE date = {date}'
    else:
        query += f' LIMIT {per_page} OFFSET {offset} '

    # execute the query
    cursor = conn.execute(query)
    rows = cursor.fetchall()
    results = []
    
    for row in rows:
        results.append({
            'date': row[2],
            'station_id': row[1],
            'max_temp': row[3],
            'min_temp': row[4],
            'precipitation': row[5]
        })

    return jsonify(results)


@app.route('/api/weather/stats', methods=['GET'])
def get_weather_stats():

    # get page parameters
    page = request.args.get('page', default=1, type=int)
    per_page = 10
    offset = (page - 1) * per_page

    # get query parameters
    station_id = request.args.get('station_id')

    # build the query
    query = 'SELECT * FROM weather_data_analysis'
    if station_id:
        query += f' WHERE station_id = {station_id}'
    else:
        query += f' LIMIT {per_page} OFFSET {offset} '

    # execute the query
    cursor = conn.execute(query)
    rows = cursor.fetchall()
    results = []

    for row in rows:
        results.append({
            'year': row[0],
            'station_id': row[1],
            'avg_max_temp': row[2],
            'avg_min_temp': row[3],
            'total_precipitation': row[4]
        })
    
    return jsonify(results)


if __name__ == '__main__':
    app.run(debug=True)

In [None]:
%%writefile test_app.py
import pytest
from app import app

def test_get_weather():
    with app.test_client() as client:
        response = client.get('/api')
        assert response.status_code == 200

In [None]:
%%writefile test_unit.py
import unittest
import requests
import json

class TestWeatherAPI(unittest.TestCase):

    def setUp(self):
        self.base_url = 'http://127.0.0.1:5000/api/weather'


    def test_get_weather_by_date(self):
        # Test getting weather data by date
        response = requests.get(self.base_url + '?date=19850102')
        self.assertEqual(response.status_code, 200)
        results = json.loads(response.text)
        self.assertGreater(len(results), 0)
        for result in results:
            self.assertEqual(result['date'], '19850102')
        
        

    def test_get_weather_by_station_ID(self):
        # Test getting weather data by station ID
        response = requests.get(self.base_url + '?station_id=1')
        self.assertEqual(response.status_code, 200)
        results = json.loads(response.text)
        self.assertGreater(len(results), 0)
        for result in results:
            self.assertEqual(result['station_id'], 1)


    def test_get_weather_stats_by_station_ID(self):
        # Test getting weather data by station ID
        response = requests.get(self.base_url + '/stats?station_id=1')
        self.assertEqual(response.status_code, 200)
        results = json.loads(response.text)
        self.assertGreater(len(results), 0)
        for result in results:
            self.assertEqual(result['station_id'], 1)


if __name__ == '__main__':
    unittest.main()

In [None]:
%%writefile swagger.yaml
openapi: 3.0.0

info:
  title: Weather data analysis API
  author: Data Engineering Candidate
  version: 1.0.0
  description: This API provides access to weather data for a given date or station ID. It Also provides statistical data of weather for given statio ID

servers:
  - url: http://127.0.0.1:5000/

paths:
  
  api:
    get:
      summary: Server Port check
      descreption: Returns "API for weather data" as text 
      responses:
        '200':
          descreption: Successful operation
          content: "API for weather data"
        '400':
          description: Invalid request


  api/weather:
    get:
      summary: Retrieve weather data
      description: Returns weather data filtered by date and station ID
      parameters:
        - in: query
          name: date
          schema:
            type: integer
            format: date
          description: Date of the weather data to retrieve (YYYYMMDD)
        - in: query
          name: station_id
          schema:
            type: integer
          description: ID of the weather station
        - in: query
          name: page
          schema:
            type: integer
            default: 1
          description: Page number for pagination
        - in: query
          name: 
          schema:
            type: 
            default: 
          description: Returns all results with default pagenumber applicable
      responses:
        '200':
          description: Successful operation
          content:
            application/json:
              schema:
                type: array
                items:
                  type: object
                  properties:
                    date:
                      type: integer
                      description: Date of the weather data
                    station_id:
                      type: integer
                      description: ID of the weather station
                    maximum_temperature:
                      type: number
                      format: real
                      description: Maximum temperature (in degrees Celsius)
                    minimum_temperature:
                      type: number
                      format: real
                      description: Minimum temperature (in degrees Celsius)
                    precipitation:
                      type: number
                      format: real
                      description: Precipitation (in centimeters)
        '400':
          description: Invalid request
  
  
  api/weather/stats:
    get:
      summary: Retrieve weather statistics
      description: Returns weather statistics filtered by date and station ID
      parameters:
        - in: query
          name: station_id
          schema:
            type: integer
          description: ID of the weather station
        - in: query
          name: page
          schema:
            type: integer
            default: 1
          description: Page number for pagination  
        - in: query
          name: 
          schema:
            type: 
            default: 
          description: Returns all results with default pagenumber applicable  
      responses:
        '200':
          description: Successful operation
          content:
            application/json:
              schema:
                type: array
                items:
                  type: object
                  properties:
                    year:
                      type: integer
                      description: Year of the weather data
                    station_id:
                      type: integer
                      description: ID of the weather station
                    average_maximum_temperature:
                      type: number
                      format: real
                      description: Average maximum temperature (in degrees Celsius)
                    average_minimum_temperature:
                      type: number
                      format: real
                      description: Average minimum temperature (in degrees Celsius)
                    total_precipitation:
                      type: number
                      format: real
                      description: Total Precipitation (in centimeters)
        '400':
          description: Invalid request

## Extra credits

To deploy the Weather Data API on Amazon Web Services, I would use the following tools and services from Amazon:

1. EC2 (Elastic Compute Cloud): I would use it to launch virtual servers to host the API and the database. I would also use EC2 instances to run the scheduled version of the data ingestion code.
2. RDS (Relational Database Service): I would use it to create and manage the SQLite database used by the API. I might migrate to PostgreSQL as well.
3. API Gateway:  I would use it to create and deploy the API endpoints that allow users to interact with the weather data.
4. Lambda: This is a serverless compute service that runs your code in response to events and automatically, that would allow me to run the scheduled version of the data ingestion code.
5. CloudWatch: This is a monitoring service that provides visibility into your cloud resources and applications. I would use it to monitor the API, the database, and the scheduled ingestion code.

With these tools and services, I would be able to create an AWS infrastructure that will include an EC2 instance for hosting the API, an RDS instance for the database, and an EC2 instance for running the scheduled ingestion code. I would eventually use CloudFormation tfor automating the deployment of these resources.

After finalizing on the infrastructure, I would then create the API endpoints using API Gateway, and connect them to the SQLite database. I would also set up CloudWatch alarms that would help me monitor the API, database, and ingestion code, and later I will configure Lambda to run the data ingestion code on a schedule.

With this setup, users to my API would be able to interact with the weather data, and new weather data would be automatically ingested into the database on a regular basis without duplicates.