<a href="https://colab.research.google.com/github/luciewparis/repository_A/blob/main/First_Data_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Air Quality Pipeline
In this notebook, we will create a pipeline that will read data from a CSV file, clean the data, and then write the cleaned data to Cloud Storeage.

## Python Imports

In [None]:
import pandas as pd
import requests
import time

## Requesting Data.

For this challenge we are going to request data from the OpenAQ (Open Air Quality data) API.  
As a Warm UP, using the 👉 [Documentation](https://docs.openaq.org/docs/introduction) and the 👉 [Official reference](https://docs.openaq.org/reference/averages_v2_get_v2_averages_get) can you find the python code to get informations about **ALL** the country available with this API ?

In [None]:
# Insert your code to read general informations about all the countries from the API
# Use the request library and the /countries endpoint to do this


#response = # Insert your code here

<details><summary>Solution 💪</summary>
  <p>
   
  ```python

  url = "https://api.openaq.org/v2/countries?limit=1000&page=1&offset=0&sort=asc&order_by=name"

  headers = {
      "accept": "application/json",
      "content-type": "application/json"
  }

  response = requests.get(url, headers=headers)

  print(response.json())
  ```

  </p>
</details>

In [None]:
# Let's print this response to see how it looks like
countries = pd.DataFrame(response.json()["results"])
print(countries.shape)
countries

### Requesting Air Quality Data a city
We want to create an automated pipeline that automatically download data from the last year for a specific city
####  1 -  Date Format

The expected format for the API is "YYYY-MM-DD", `using time.now()` create a variable `date_from` that store the date 1 year from the current date in the correct format



In [None]:
# Get Date from one year ago using datetime
# You can use the datetime library to get the current date and time
# and then use timedelata to subtract 365 days from it to get the date from one year ago
# Finally, you can format the date as a string in the format "YYYY-MM-DD" using the .strftime method


date_from = None #

In [None]:
# Run the following code without modifying it to do the api call

endpoint =  "https://api.openaq.org/v2/measurements"

params = {
    "date_from": date_from,
    "limit": 1000, # Keeping 1000 as the limit for speed purposes
    "sort": "desc",
    "order_by": "datetime",
    "city": "PARIS 9E ARRONDISSEMENT"
}

headers = {"accept": "application/json"}

response = requests.get(endpoint, headers=headers,params=params)

res = pd.DataFrame(response.json()["results"])
res

#### 2 - Format the API result
Notice that we have several columns that are JSON object like the `date` and `coordinates`. We could keep in this way and store them like that but to simplify the logic needed we are going to extract the information from those columns.  
First let's extract only the `utc` from the date column.
You can use the `pd.Series().apply` method with a lambda function for this task ! Once it's done reassign the result to the date column

In [None]:
res["date"] = #TODO

In [None]:
# Run this cell to verify that the date column is a datetime object
from pandas.api.types import is_datetime64_any_dtype as is_datetime
assert is_datetime(res["date"]) == True , "Date column is not a datetime object, maybe you should verify your conversion"

Let's do the same with the `coordinates`columns, extract the lat and lon and put them respectively in a `latitude` and `longitude` column.
To try a different method you can use the `pd.json_normalize` function on res["coordinates]

In [None]:
# TODO Create a latitude and longitude column and drop the coordinates column

Run the following column to make sure your DataFramme is correct ✅

In [None]:
assert set(res.columns) == set(['locationId', 'location', 'parameter', 'value', 'date', 'unit',
       'country', 'city', 'isMobile', 'isAnalysis', 'entity', 'sensorType',
       'latitude', 'longitude']), "Columns are not the same as expected"

#### 3 - Save the Data to a Csv format
It's alaways usefull to have a local back up of your data when you build a Data Pipeline. Let's save your DataFrame to a csv !  
Create a data folder and then save your date in a `data/air_quality.csv`

In [None]:
# TODO


#### 4 - Upload Data to Cloud Storage

We provide you with the python code to load your DataFrame, you can find it in the official 👉 [GCP Documentation](https://cloud.google.com/storage/docs/uploading-objects-from-memory)

Before you dive in the code :
* Go to GCP and create a new bucket. 👉 [Create a Bucket](https://console.cloud.google.com/storage/create-bucket)
  Remember, a bucket name must be globaly unique !
  * Choose Multi-Region -> EU
  * Default Storage Class -> Standard
  * Leave all the other parameters as default
* Now, we need to create a service account to be able to load data using Python
  * Go to the 👉[Service account page](https://console.cloud.google.com/iam-admin/serviceaccounts/create)
  * Name your service account `bucket-admin`
  * Give it the role `Storage Object Admin`

    <details><summary>Service Account Role</summary>
      <p>

      <img src="https://storage.googleapis.com/schoolofdata-images/Data-Sourcing.Data-Pipelines/service-account-role.png"  />
      
      </p>
    </details>

* The final step needed is to create A JSON Key for your service account
  * Click on freshly created service account
  
    <details><summary>Details</summary>
      <p>

      <img src="https://storage.googleapis.com/schoolofdata-images/Data-Sourcing.Data-Pipelines/click.png"  />
      </p>
    </details>

  * Click on KEY > ADD KEY > Create new key
    <details><summary>Details</summary>
      <p>

      <img src="https://storage.googleapis.com/schoolofdata-images/Data-Sourcing.Data-Pipelines/Create%20Key.png"  />
      <
      </p>
    </details>

  * Click on Create, the Private key should automatically be download on your computer


In [None]:
# If you are running this Notebook on collab, uncomment this cell to upload the key
# from google.colab import files
# files.upload()

In [None]:
# Now we need to specify to google where the credentials are stored
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "REPLACE ME WITH THE CORRECT PATH"

In [None]:
from google.cloud import storage

# REPLACE THIS WITH YOUR OWN GOOGLE CLOUD STORAGE BUCKET NAME
bucket_name = "TODO:REPLACEBYCLOUDSTORAGEBUCKET"
# REPLACE 'storage-object-name' with the name you want the file to have in GCS
destination_blob_name = "storage-object-name"


# Leave the rest of the code as it is 😊
from google.cloud import storage
def upload_blob_from_memory(bucket_name, contents, destination_blob_name):

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_string(contents)

    print(
        f"{destination_blob_name} with contents uploaded to {bucket_name}. ✅"
    )
upload_blob_from_memory(bucket_name, res.to_csv(),destination_blob_name)

Congratulations! You have successfully completed this manual pipeline notebook. This is a simple pipeline that reads data from an API, processes it, and uploads it to Google Cloud Storage. In the next step we will try to automate and schedule this process, based on your current knowledge of GCP do you know wich services we are going to use ?
<details>
<summary>Answer 🧙‍♀️</summary>
  <p>
  
  There is a lot of possibilities and not a single infrastructure solution. But we can design and build a robust pipeline using Cloud function, Cloud scheduller and Pub Sub !

  </p>
</details>