#Data Pipeline using LLM

Go to https://groq.com/ and generate a Free API Key.


1. Data Cleaning:

  Begin by loading the dataset into your Colab environment.
  Use pandas functions like head(), info(), describe(), and value_counts() to explore the structure, data types, and basic statistics of the dataset.

  Identify potential data quality issues such as missing values, inconsistent formats, or incorrect entries.
  Prompt Engineering:

  This is the core of the lab. Your task is to craft a prompt that instructs an LLM (Groq's LLama2) to clean the data.

  The Cleaning Goals: Your prompt should guide the LLM to perform the following tasks:

  * Address missing values: Infer or fill in missing information where possible (e.g., city names from addresses).
  * Standardize text: Correct spelling, apply consistent capitalization, and ensure uniformity in categorical values.
  * Validate and format: Ensure that addresses are in a standard format (e.g., "Street, Borough, NY"), and that dates and times follow ISO 8601.
  * Categorize: Assign clear categories to ambiguous complaint descriptions (e.g., "Noise," "Non-Noise").

  You are not given the prompt used in the example code, but you are given the expected results.
  Iterative Refinement: Start with a basic prompt and gradually refine it based on the LLM's output. Observe how the LLM responds and make adjustments to improve the cleaning process.

2. Data Validation:

  After cleaning the data, write unit tests (using Python's assert statements) to validate the output.
  Your tests should check data types, value ranges, and ensure that required fields are not null.
  Generate code for tests. Try to see the problems in running the code.

Submission: Write your prompts in a text file and upload on LMS.

In [None]:
# Groq-Powered Data Engineering Pipeline

# Step 1: Install Required Libraries
!pip install groq itables



In [None]:
# Step 2: Import Libraries
from groq import Groq
import pandas as pd
from itables import init_notebook_mode
from google.colab import userdata
import json
import re
from tqdm import tqdm
import itables

init_notebook_mode(all_interactive=True)

In [None]:
# Load a manageable sample (500 rows) for this lab
url = "https://data.cityofnewyork.us/resource/erm2-nwe9.csv?$limit=500"
df = pd.read_csv(url)
df

unique_key,created_date,closed_date,agency,agency_name,complaint_type,descriptor,location_type,incident_zip,incident_address,street_name,cross_street_1,cross_street_2,intersection_street_1,intersection_street_2,address_type,city,landmark,facility_type,status,due_date,resolution_description,resolution_action_updated_date,community_board,bbl,borough,x_coordinate_state_plane,y_coordinate_state_plane,open_data_channel_type,park_facility_name,park_borough,vehicle_type,taxi_company_borough,taxi_pick_up_location,bridge_highway_name,bridge_highway_direction,road_ramp,bridge_highway_segment,latitude,longitude,location
Loading ITables v2.3.0 from the internet... (need help?),,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500 entries, 0 to 499
Data columns (total 41 columns):
 #   Column                          Non-Null Count  Dtype  
---  ------                          --------------  -----  
 0   unique_key                      500 non-null    int64  
 1   created_date                    500 non-null    object 
 2   closed_date                     156 non-null    object 
 3   agency                          500 non-null    object 
 4   agency_name                     500 non-null    object 
 5   complaint_type                  500 non-null    object 
 6   descriptor                      497 non-null    object 
 7   location_type                   477 non-null    object 
 8   incident_zip                    498 non-null    float64
 9   incident_address                494 non-null    object 
 10  street_name                     493 non-null    object 
 11  cross_street_1                  477 non-null    object 
 12  cross_street_2                  476 

In [None]:
df

unique_key,created_date,closed_date,agency,agency_name,complaint_type,descriptor,location_type,incident_zip,incident_address,street_name,cross_street_1,cross_street_2,intersection_street_1,intersection_street_2,address_type,city,landmark,facility_type,status,due_date,resolution_description,resolution_action_updated_date,community_board,bbl,borough,x_coordinate_state_plane,y_coordinate_state_plane,open_data_channel_type,park_facility_name,park_borough,vehicle_type,taxi_company_borough,taxi_pick_up_location,bridge_highway_name,bridge_highway_direction,road_ramp,bridge_highway_segment,latitude,longitude,location
Loading ITables v2.3.0 from the internet... (need help?),,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [None]:
client = Groq(api_key="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")

In [None]:
def llm_complex_clean(record):
    prompt = f"""

    You are a data cleaning expert. Given the following record from New York City Incidents report:

    {record.to_dict()}

    Perform the following cleaning tasks and return the cleaned record as a Python dictionary:

    1. **Address Missing Values:** Infer or fill in missing values for City using incident_address fields. For example, if the 'city' is missing but the 'address' contains 'New York', fill in 'city' as 'New York'.
    2. **Standardize Text:**
       - Determine all categorical columns
       - Correct any spelling errors in these columns
       - Apply consistent capitalization. For example, all city names should start with capital letters
    3. **Validate and Format:**
       - Ensure that addresses are in a standard format, such as "Street, Borough, NY".
       - Format dates and times according to the ISO 8601 standard (YYYY-MM-DDTHH:MM:SS).
    4. **Categorize:**
       - In a new column, assign clear categories to complaint descriptions and types. For example, categorize complaints related to noise as 'Noise' and other complaints as 'Non-Noise'.
    """

    chat_completion = client.chat.completions.create(
        model="llama3-8b-8192",
        messages=[{"role": "user", "content": prompt}],
        temperature=0
    )

    cleaned_record = extract_dict_from_response(chat_completion.choices[0].message.content.strip())

    ## ADD CODE HERE hint: chat_completion.choices[0].message.content.strip(). Return statement will be updated as well
    print(cleaned_record)
    print(chat_completion.choices[0].message.content.strip())
    return cleaned_record


In [None]:
# Feel Free to define any number of functions.

def extract_dict_from_response(response_string):
    """
    Extracts a dictionary from a string using regular expressions and fixes JSON formatting.

    Args:
    response_string: The string containing the dictionary representation.

    Returns:
    A dictionary extracted from the response string.
    """
    # Define a regular expression pattern to match the dictionary structure
    pattern = r"\{.*?\}"  # Matches any characters between curly braces

    # Find all matches in the response string
    matches = re.findall(pattern, response_string, re.DOTALL)

    # If matches are found, extract the first match and fix JSON formatting
    if matches:
        try:
            # Replace single quotes with double quotes for keys and values
            json_string = matches[0].replace("'", '"')
            # Replace Python's None with JSON's null
            json_string = json_string.replace("None", "null")

            # Parse the fixed JSON string
            extracted_dict = json.loads(json_string)
            return extracted_dict
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON: {e}")
            return None  # Or raise an exception if desired
    else:
        print("No dictionary structure found in the response.")
        return None

In [None]:
cleaned_records = []
sample_df = df.head(10)  # Start with 10 rows due to complexity & API limits

for _, row in tqdm(sample_df.iterrows(), total=len(sample_df)):
    try:
        cleaned_record = llm_complex_clean(row)
        cleaned_records.append(cleaned_record)
    except Exception as e:
        print(f"Error cleaning row {_}: {e}")

cleaned_df = pd.DataFrame(cleaned_records)
cleaned_df.head(10)

 10%|█         | 1/10 [00:00<00:08,  1.02it/s]

{'unique_key': 64677480, 'created_date': '2025-04-18T01:35:22.000', 'closed_date': None, 'agency': 'NYPD', 'agency_name': 'New York City Police Department', 'complaint_type': 'Illegal Parking', 'descriptor': 'Double Parked Blocking Vehicle', 'location_type': 'Street/Sidewalk', 'incident_zip': 10463.0, 'incident_address': '3435 GILES PLACE, BRONX, NY', 'street_name': 'GILES PLACE', 'cross_street_1': 'CANNON PLACE', 'cross_street_2': 'SEDGWICK AVENUE', 'intersection_street_1': 'CANNON PLACE', 'intersection_street_2': 'SEDGWICK AVENUE', 'address_type': 'ADDRESS', 'city': 'BRONX', 'landmark': 'GILES PLACE', 'facility_type': None, 'status': 'In Progress', 'due_date': None, 'resolution_description': None, 'resolution_action_updated_date': None, 'community_board': '08 BRONX', 'bbl': 2032580228.0, 'borough': 'BRONX', 'x_coordinate_state_plane': 1012598.0, 'y_coordinate_state_plane': 260247.0, 'open_data_channel_type': 'PHONE', 'park_facility_name': 'Unspecified', 'park_borough': 'BRONX', 'vehi

 20%|██        | 2/10 [00:01<00:07,  1.08it/s]

{'unique_key': 64672937, 'created_date': '2025-04-18T01:34:18.000', 'closed_date': None, 'agency': 'NYPD', 'agency_name': 'New York City Police Department', 'complaint_type': 'Noise - Street/Sidewalk', 'descriptor': 'Loud Music/Party', 'location_type': 'Street/Sidewalk', 'incident_zip': 10464.0, 'incident_address': '219 FORDHAM STREET, BRONX, NY', 'street_name': 'FORDHAM STREET', 'cross_street_1': 'FORDHAM PLACE', 'cross_street_2': 'FORDHAM PLACE', 'intersection_street_1': 'FORDHAM PLACE', 'intersection_street_2': 'FORDHAM PLACE', 'address_type': 'ADDRESS', 'city': 'BRONX', 'landmark': 'FORDHAM STREET', 'facility_type': None, 'status': 'In Progress', 'due_date': None, 'resolution_description': None, 'resolution_action_updated_date': None, 'community_board': '10 BRONX', 'bbl': 2056440258.0, 'borough': 'BRONX', 'x_coordinate_state_plane': 1044134.0, 'y_coordinate_state_plane': 248271.0, 'open_data_channel_type': 'ONLINE', 'park_facility_name': 'Unspecified', 'park_borough': 'BRONX', 'veh

 30%|███       | 3/10 [00:02<00:06,  1.12it/s]

{'unique_key': 64679960, 'created_date': '2025-04-18T01:33:57.000', 'closed_date': None, 'agency': 'NYPD', 'agency_name': 'New York City Police Department', 'complaint_type': 'Noise - Street/Sidewalk', 'descriptor': 'Loud Music/Party', 'location_type': 'Street/Sidewalk', 'incident_zip': 10035.0, 'incident_address': '523 EAST 117 STREET, NEW YORK, NY', 'street_name': 'EAST 117 STREET', 'cross_street_1': 'PLEASANT AVENUE', 'cross_street_2': 'UNNAMED STREET', 'intersection_street_1': 'PLEASANT AVENUE', 'intersection_street_2': 'UNNAMED STREET', 'address_type': 'ADDRESS', 'city': 'NEW YORK', 'landmark': 'EAST 117 STREET', 'facility_type': None, 'status': 'In Progress', 'due_date': None, 'resolution_description': None, 'resolution_action_updated_date': '2025-04-18T02:21:19.000', 'community_board': '11 MANHATTAN', 'bbl': 1017160008.0, 'borough': 'MANHATTAN', 'x_coordinate_state_plane': 1002979.0, 'y_coordinate_state_plane': 229147.0, 'open_data_channel_type': 'ONLINE', 'park_facility_name': 

 40%|████      | 4/10 [00:05<00:10,  1.80s/it]

{'unique_key': 64675055, 'created_date': '2025-04-18T01:33:38.000', 'closed_date': None, 'agency': 'NYPD', 'agency_name': 'New York City Police Department', 'complaint_type': 'Noise - Residential', 'descriptor': 'Banging/Pounding', 'location_type': 'Residential Building/House', 'incident_zip': 11226.0, 'incident_address': '31 East Street, Brooklyn, NY', 'street_name': 'East 31 Street', 'cross_street_1': 'Clarendon Road', 'cross_street_2': 'Avenue D', 'intersection_street_1': 'Clarendon Road', 'intersection_street_2': 'Avenue D', 'address_type': 'ADDRESS', 'city': 'Brooklyn', 'landmark': 'East 31 Street', 'facility_type': None, 'status': 'In Progress', 'due_date': None, 'resolution_description': None, 'resolution_action_updated_date': None, 'community_board': '17 Brooklyn', 'bbl': 3049470035.0, 'borough': 'Brooklyn', 'x_coordinate_state_plane': 998781.0, 'y_coordinate_state_plane': 173109.0, 'open_data_channel_type': 'PHONE', 'park_facility_name': 'Unspecified', 'park_borough': 'Brookly

 50%|█████     | 5/10 [00:19<00:30,  6.17s/it]

{'unique_key': 64680154, 'created_date': '2025-04-18T01:33:25.000', 'closed_date': None, 'agency': 'NYPD', 'agency_name': 'New York City Police Department', 'complaint_type': 'Noise - Residential', 'descriptor': 'Loud Music/Party', 'location_type': 'Residential Building/House', 'incident_zip': 10467.0, 'incident_address': '3505 Rochambeau Avenue, Bronx, NY', 'street_name': 'Rochambeau Avenue', 'cross_street_1': 'East Gun Hill Road', 'cross_street_2': 'East 212 Street', 'intersection_street_1': 'East Gun Hill Road', 'intersection_street_2': 'East 212 Street', 'address_type': 'ADDRESS', 'city': 'Bronx', 'landmark': 'Rochambeau Avenue', 'facility_type': None, 'status': 'In Progress', 'due_date': None, 'resolution_description': None, 'resolution_action_updated_date': None, 'community_board': '07 Bronx', 'bbl': 2033280125.0, 'borough': 'BRONX', 'x_coordinate_state_plane': 1017647.0, 'y_coordinate_state_plane': 260607.0, 'open_data_channel_type': 'ONLINE', 'park_facility_name': 'Unspecified'

 60%|██████    | 6/10 [00:33<00:35,  8.85s/it]

{'unique_key': 64674005, 'created_date': '2025-04-18T01:33:24.000', 'closed_date': None, 'agency': 'NYPD', 'agency_name': 'New York City Police Department', 'complaint_type': 'Noise - Residential', 'descriptor': 'Banging/Pounding', 'location_type': 'Residential Building/House', 'incident_zip': 11214.0, 'incident_address': '83 STREET, BROOKLYN, NY', 'street_name': '83 STREET', 'cross_street_1': '18 AVENUE', 'cross_street_2': '19 AVENUE', 'intersection_street_1': '18 AVENUE', 'intersection_street_2': '19 AVENUE', 'address_type': 'ADDRESS', 'city': 'BROOKLYN', 'landmark': '83 STREET', 'facility_type': None, 'status': 'In Progress', 'due_date': None, 'resolution_description': None, 'resolution_action_updated_date': None, 'community_board': '11 BROOKLYN', 'bbl': 3063150039.0, 'borough': 'BROOKLYN', 'x_coordinate_state_plane': 984421.0, 'y_coordinate_state_plane': 160824.0, 'open_data_channel_type': 'PHONE', 'park_facility_name': 'Unspecified', 'park_borough': 'BROOKLYN', 'vehicle_type': Non

 70%|███████   | 7/10 [00:34<00:18,  6.26s/it]

{'unique_key': 64681084, 'created_date': '2025-04-18T01:33:22.000', 'closed_date': None, 'agency': 'NYPD', 'agency_name': 'New York City Police Department', 'complaint_type': 'Noise - Residential', 'descriptor': 'Loud Talking', 'location_type': 'Residential Building/House', 'incident_zip': 11239.0, 'incident_address': '135 ELMIRA LOOP, BROOKLYN, NY', 'street_name': 'ELMIRA LOOP', 'cross_street_1': 'SCHROEDERS AVENUE', 'cross_street_2': 'BEND', 'intersection_street_1': 'SCHROEDERS AVENUE', 'intersection_street_2': 'BEND', 'address_type': 'ADDRESS', 'city': 'BROOKLYN', 'landmark': 'ELMIRA LOOP', 'facility_type': None, 'status': 'In Progress', 'due_date': None, 'resolution_description': None, 'resolution_action_updated_date': None, 'community_board': '05 BROOKLYN', 'bbl': 3044520085.0, 'borough': 'BROOKLYN', 'x_coordinate_state_plane': 1017337.0, 'y_coordinate_state_plane': 175737.0, 'open_data_channel_type': 'PHONE', 'park_facility_name': 'Unspecified', 'park_borough': 'BROOKLYN', 'vehic

 80%|████████  | 8/10 [00:35<00:09,  4.54s/it]

{'unique_key': 64677685, 'created_date': '2025-04-18T01:32:48.000', 'closed_date': None, 'agency': 'NYPD', 'agency_name': 'New York City Police Department', 'complaint_type': 'Noise - Street/Sidewalk', 'descriptor': 'Loud Music/Party', 'location_type': 'Street/Sidewalk', 'incident_zip': 10464.0, 'incident_address': '219 FORDHAM STREET, BRONX, NY', 'street_name': 'FORDHAM STREET', 'cross_street_1': 'FORDHAM PLACE', 'cross_street_2': 'FORDHAM PLACE', 'intersection_street_1': 'FORDHAM PLACE', 'intersection_street_2': 'FORDHAM PLACE', 'address_type': 'ADDRESS', 'city': 'BRONX', 'landmark': 'FORDHAM STREET', 'facility_type': None, 'status': 'In Progress', 'due_date': None, 'resolution_description': None, 'resolution_action_updated_date': None, 'community_board': '10 BRONX', 'bbl': 2056440258.0, 'borough': 'BRONX', 'x_coordinate_state_plane': 1044134.0, 'y_coordinate_state_plane': 248271.0, 'open_data_channel_type': 'MOBILE', 'park_facility_name': 'Unspecified', 'park_borough': 'BRONX', 'veh

 90%|█████████ | 9/10 [00:36<00:03,  3.40s/it]

{'unique_key': 64681329, 'created_date': '2025-04-18T01:32:47.000', 'closed_date': None, 'agency': 'NYPD', 'agency_name': 'New York City Police Department', 'complaint_type': 'Noise - Residential', 'descriptor': 'Loud Music/Party', 'location_type': 'Residential Building/House', 'incident_zip': 11216.0, 'incident_address': '671 MARCY AVENUE, BROOKLYN, NY', 'street_name': 'MARCY AVENUE', 'cross_street_1': 'DEKALB AVENUE', 'cross_street_2': 'KOSCIUSZKO STREET', 'intersection_street_1': 'DEKALB AVENUE', 'intersection_street_2': 'KOSCIUSZKO STREET', 'address_type': 'ADDRESS', 'city': 'BROOKLYN', 'landmark': 'MARCY AVENUE', 'facility_type': None, 'status': 'In Progress', 'due_date': None, 'resolution_description': None, 'resolution_action_updated_date': '2025-04-18T02:24:16.000', 'community_board': '03 BROOKLYN', 'bbl': 3017800105.0, 'borough': 'BROOKLYN', 'x_coordinate_state_plane': 998558.0, 'y_coordinate_state_plane': 191179.0, 'open_data_channel_type': 'MOBILE', 'park_facility_name': 'Un

100%|██████████| 10/10 [00:38<00:00,  3.86s/it]

{'unique_key': 64678942, 'created_date': '2025-04-18T01:32:11.000', 'closed_date': None, 'agency': 'NYPD', 'agency_name': 'New York City Police Department', 'complaint_type': 'Noise - Residential', 'descriptor': 'Loud Music/Party', 'location_type': 'Residential Building/House', 'incident_zip': 10022.0, 'incident_address': '441 East 57th Street, Manhattan, NY', 'street_name': 'East 57th Street', 'cross_street_1': '1 Avenue', 'cross_street_2': 'Sutton Place', 'intersection_street_1': '1 Avenue', 'intersection_street_2': 'Sutton Place', 'address_type': 'ADDRESS', 'city': 'New York', 'landmark': 'East 57th Street', 'facility_type': None, 'status': 'In Progress', 'due_date': None, 'resolution_description': None, 'resolution_action_updated_date': '2025-04-18T01:49:36.000', 'community_board': '06 Manhattan', 'bbl': 1013697502.0, 'borough': 'Manhattan', 'x_coordinate_state_plane': 994795.0, 'y_coordinate_state_plane': 215404.0, 'open_data_channel_type': 'PHONE', 'park_facility_name': 'Unspecif




unique_key,created_date,closed_date,agency,agency_name,complaint_type,descriptor,location_type,incident_zip,incident_address,street_name,cross_street_1,cross_street_2,intersection_street_1,intersection_street_2,address_type,city,landmark,facility_type,status,due_date,resolution_description,resolution_action_updated_date,community_board,bbl,borough,x_coordinate_state_plane,y_coordinate_state_plane,open_data_channel_type,park_facility_name,park_borough,vehicle_type,taxi_company_borough,taxi_pick_up_location,bridge_highway_name,bridge_highway_direction,road_ramp,bridge_highway_segment,latitude,longitude,location,category,complaint_category
Loading ITables v2.3.0 from the internet... (need help?),,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


Data Validation

In [None]:
def generate_complex_validation_tests(record):
    prompt = f"""
    Given the cleaned NYC 311 data record below:

    {record}

    """

    chat_completion = client.chat.completions.create(
        model="llama3-8b-8192",
        messages=[{"role": "user", "content": prompt}],
        temperature=0
    )

    return chat_completion.choices[0].message.content.strip()

# Generate tests based on first cleaned record
test_code = generate_complex_validation_tests(df.iloc[0].to_dict())
print(test_code)

This is a JSON object representing a single record from the NYC 311 data. Here's a breakdown of the fields:

1. `unique_key`: a unique identifier for the record, in this case 64677480.
2. `created_date`: the date and time the record was created, in the format `YYYY-MM-DDTHH:MM:SS.SSS`.
3. `closed_date`: the date and time the record was closed, which is not applicable in this case since the record is still open.
4. `agency`: the agency responsible for handling the complaint, which is the NYPD (New York City Police Department).
5. `agency_name`: the full name of the agency, which is also the NYPD.
6. `complaint_type`: the type of complaint, which is "Illegal Parking".
7. `descriptor`: a brief description of the complaint, which is "Double Parked Blocking Vehicle".
8. `location_type`: the type of location where the complaint was reported, which is "Street/Sidewalk".
9. `incident_zip`: the zip code of the incident, which is 10463.
10. `incident_address`: the full address of the incident, w

In [None]:
# Evaluate tests programmatically (OPTIONAL)

test_code = """
def test_data_types(cleaned_df):
    assert cleaned_df['unique_key'].dtype == 'int64'
    cleaned_df['created_date'] = pd.to_datetime(cleaned_df['created_date'])
    assert pd.api.types.is_datetime64_any_dtype(cleaned_df['created_date'])
    assert cleaned_df['latitude'].dtype in ['float64', 'float32']
    assert cleaned_df['longitude'].dtype in ['float64', 'float32']

def test_null_values(cleaned_df):
    required_fields = ['unique_key', 'created_date', 'agency_name', 'complaint_type','incident_address','city','longitude','latitude','location']
    for field in required_fields:
        assert cleaned_df[field].notnull().all(), f"{field} should not have null values"


test_data_types(cleaned_df)
test_null_values(cleaned_df)
print("All tests passed!")
"""

exec(test_code)


All tests passed!
