In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [1]:
!pip install -q findspark

In [2]:
import os

# Set environment variables for Java and Spark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"  # Update if path changes


In [4]:
import findspark
findspark.init()

# Start a Spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local") \
    .appName("GoogleColabSpark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

# Verify Spark version
print(spark.version)

3.4.1


In [5]:
from pyspark.sql import SparkSession
import logging
import os
from tabulate import tabulate  # Import tabulate for better table formatting

# Set up the logger
log_file = 'output.log'

# If the log file already exists, remove it to create a fresh file
if os.path.exists(log_file):
    os.remove(log_file)

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Create a file handler to write log messages to a file (overwrite the log file each time)
# Specify utf-8 encoding to handle special characters
file_handler = logging.FileHandler(log_file, mode='w', encoding='utf-8')  # Use 'w' mode to overwrite the log file
file_handler.setLevel(logging.INFO)

# Create a log formatter
formatter = logging.Formatter('%(message)s')
file_handler.setFormatter(formatter)

# Add the file handler to the logger
logger.addHandler(file_handler)

# Function to log and print information messages (not DataFrame content)
def log_message(message=None, title=None):
    if title:
        logger.info(f"=== {title} ===")
    if message:
        logger.info(message)
    #print(message)  # Optionally print to the console for local execution

# Function to log and print the output of show(5) from DataFrame in a table format
def log_show_output(spark_df=None, title=None):
    if spark_df and title:
        logger.info(f"=== {title} ===")
        # Get the column names from the DataFrame
        columns = spark_df.columns
        # Get the top 5 rows
        rows = spark_df.take(5)
        # Format the rows into a list of lists, ready for tabulation
        formatted_rows = [list(row) for row in rows]
        # Log the table as a string using tabulate
        table = tabulate(formatted_rows, headers=columns, tablefmt="grid")
        # Log and print the table
        logger.info(table)
        #print(table)  # Optionally print to the console for local execution

In [6]:
import os
import boto3
import pandas as pd
import io  # Import the io module to use StringIO
from pyspark.sql import SparkSession

# # Set up AWS credentials as environment variables (this can be done securely)
os.environ['AWS_ACCESS_KEY_ID'] = ''
os.environ['AWS_SECRET_ACCESS_KEY'] = ''
os.environ['AWS_DEFAULT_REGION'] = ''  # Replace with your AWS region

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("BDA Mini Project") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "16g") \
    .config("spark.kryoserializer.buffer", "512m") \
    .config("spark.kryoserializer.buffer.max", "1024m") \
    .config("spark.sql.shuffle.partitions", "100") \
    .config("spark.default.parallelism", "100") \
    .getOrCreate()

# Use Boto3 to interact with S3 (initialize a session)
s3_client = boto3.client('s3')

# Define the S3 bucket and file
bucket_name = 'rmarathe-raw-data'
file_key = 'car_dataset.csv'

# Generate a signed URL to access the S3 file
s3_url = f"s3://{bucket_name}/{file_key}"

# Read the file into a pandas DataFrame (using boto3 to fetch it)
# Using boto3's s3 client to get the file as a CSV
obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
data = obj['Body'].read().decode('utf-8')  # Read the file as a string

# Use io.StringIO to read the string data as if it were a file
df_pandas = pd.read_csv(io.StringIO(data))  # Convert to pandas DataFrame

# Convert pandas DataFrame to PySpark DataFrame
df = spark.createDataFrame(df_pandas)
# Get the shape (row count and column count)
row_count = df.count()  # Number of rows
column_count = len(df.columns)  # Number of columns
# Log the shape of the DataFrame
log_message(message=f"Shape of the DataFrame: ({row_count}, {column_count})")
# Log and print the top 5 rows of the DataFrame as a table
log_show_output(spark_df=df, title="Top 5 Rows of the DataFrame")


INFO:botocore.credentials:Found credentials in environment variables.
INFO:root:Shape of the DataFrame: (30781, 26)
INFO:root:=== Top 5 Rows of the DataFrame ===
INFO:root:+------------+--------------------------------------------------------------------------------------------+------------------+----------------------------------+---------+--------+----------------+---------------------+-------------+-------------+--------+------------+----------------+----------------+-------------------+---------+-----------+----------+---------------+---------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [7]:
# Drop the specified columns from the DataFrame
df = df.drop('url', 'image_url','region', 'region_url', 'VIN', 'ID','description','drive','fuel','title_status')

In [8]:
# Step 1: Define the fraction of the data you want to sample
# For example, if you want to sample 10% of the data:
fraction = 0.4  # Adjust this based on your needs, 0.1 means 10% of the data

# Step 2: Perform the random sampling
df = df.sample(fraction=fraction, seed=42)  # Set a seed for reproducibility

# Step 3: Check the number of rows after sampling
row_count = df.count()
print(f"Rows after random sampling: {row_count}")
log_show_output(spark_df=df, title="Top 5 Rows of the DataFrame")


INFO:root:=== Top 5 Rows of the DataFrame ===


Rows after random sampling: 12349


INFO:root:+---------+--------+----------------+---------------+-------------+-------------+------------+----------------+-----------+--------+---------------+----------+---------+---------+-----------+--------------------------+
|   price |   year | manufacturer   | model         | condition   | cylinders   |   odometer | transmission   | size      | type   | paint_color   |   county | state   |     lat |      long | posting_date             |
|    7990 |   2012 | nissan         | juke          | excellent   | 4 cylinders |     120056 | automatic      | mid-size  | SUV    | white         |      nan | fl      | 28.027  |  -82.4599 | 2021-04-23T14:45:23-0400 |
+---------+--------+----------------+---------------+-------------+-------------+------------+----------------+-----------+--------+---------------+----------+---------+---------+-----------+--------------------------+
|   18980 |   2013 | subaru         | impreza wrx   | NaN         | 4 cylinders |     100724 | manual         | Na

In [9]:
from pyspark.sql.functions import col, sum as _sum, when, isnan

# Replace non-standard null values with None
df = df.replace(["", "N/A", "None", "null","NaN"], None)

# Check for null and NaN counts for each column
null_counts = df.select([
    _sum(when(col(c).isNull() | isnan(col(c)), 1).otherwise(0)).alias(c)
    for c in df.columns
])
null_counts_row = null_counts.collect()[0].asDict()
log_show_output(spark_df=df, title="Top 5 Rows of the DataFrame")
log_message(message=null_counts_row)

INFO:root:=== Top 5 Rows of the DataFrame ===
INFO:root:+---------+--------+----------------+---------------+-------------+-------------+------------+----------------+-----------+--------+---------------+----------+---------+---------+-----------+--------------------------+
|   price |   year | manufacturer   | model         | condition   | cylinders   |   odometer | transmission   | size      | type   | paint_color   |   county | state   |     lat |      long | posting_date             |
|    7990 |   2012 | nissan         | juke          | excellent   | 4 cylinders |     120056 | automatic      | mid-size  | SUV    | white         |      nan | fl      | 28.027  |  -82.4599 | 2021-04-23T14:45:23-0400 |
+---------+--------+----------------+---------------+-------------+-------------+------------+----------------+-----------+--------+---------------+----------+---------+---------+-----------+--------------------------+
|   18980 |   2013 | subaru         | impreza wrx   |             | 

In [10]:
from pyspark.sql.functions import col, sum as _sum, when, isnan
# Get the total number of rows in the DataFrame
total_rows = df.count()
# Calculate the percentage of nulls and NaNs in each column
null_percentage = (
    df.select([
        (_sum(when(col(c).isNull() | isnan(col(c)), 1).otherwise(0)) / total_rows).alias(c)
        for c in df.columns
    ])
)
# Collect the percentages into a dictionary for evaluation
null_percentage_dict = null_percentage.collect()[0].asDict()
# Identify columns to drop (null percentage > 40%)
columns_to_drop = [col for col, perc in null_percentage_dict.items() if perc > 0.4]
# Drop the identified columns
df_cleaned = df.drop(*columns_to_drop)
# Log the dropped columns and cleaned DataFrame schema
log_message(message=f"Columns dropped: {columns_to_drop}")
log_message(message=df_cleaned.printSchema())  # Log the schema directly without embedding in log_message

INFO:root:Columns dropped: ['condition', 'cylinders', 'size', 'county']


root
 |-- price: long (nullable = true)
 |-- year: double (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- odometer: double (nullable = true)
 |-- transmission: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- state: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- posting_date: string (nullable = true)



In [11]:
# Get the shape of the DataFrame after dropping the columns
num_rows = df.count()
num_columns = len(df.columns)
log_message(message=f"Shape of the dataset after dropping columns: ({num_rows}, {num_columns})")

INFO:root:Shape of the dataset after dropping columns: (12349, 16)


In [12]:
df_cleaned = df_cleaned.dropna()
# Get the shape of the DataFrame after dropping the columns
num_rows = df_cleaned.count()
num_columns = len(df_cleaned.columns)
log_message(message=f"Shape of the dataset after dropping columns: ({num_rows}, {num_columns})")

INFO:root:Shape of the dataset after dropping columns: (7244, 12)


In [13]:
from pyspark.sql.functions import col
# Cast `price` and `odometer` to float
columns_to_cast_float = ["price", "odometer"]
for column in columns_to_cast_float:
    df_cleaned = df_cleaned.withColumn(column, col(column).cast("float"))
# Extract numeric part from `cylinders` and cast `year` and `cylinders` to integer
df_cleaned = df_cleaned.withColumn("year", col("year").cast("int"))
log_message(message=df_cleaned.printSchema())
log_show_output(spark_df=df_cleaned, title="Top 5 Rows of the DataFrame after type casting")

INFO:root:=== Top 5 Rows of the DataFrame after type casting ===


root
 |-- price: float (nullable = true)
 |-- year: integer (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- odometer: float (nullable = true)
 |-- transmission: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- state: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- posting_date: string (nullable = true)



INFO:root:+---------+--------+----------------+---------------+------------+----------------+--------+---------------+---------+---------+-----------+--------------------------+
|   price |   year | manufacturer   | model         |   odometer | transmission   | type   | paint_color   | state   |     lat |      long | posting_date             |
|    7990 |   2012 | nissan         | juke          |     120056 | automatic      | SUV    | white         | fl      | 28.027  |  -82.4599 | 2021-04-23T14:45:23-0400 |
+---------+--------+----------------+---------------+------------+----------------+--------+---------------+---------+---------+-----------+--------------------------+
|   18980 |   2013 | subaru         | impreza wrx   |     100724 | manual         | wagon  | silver        | wa      | 47.6929 | -117.412  | 2021-05-04T18:00:56-0700 |
+---------+--------+----------------+---------------+------------+----------------+--------+---------------+---------+---------+-----------+----------

In [14]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from geopy.geocoders import Nominatim

# Define the UDF to get ZIP code
@udf(StringType())
def get_zip_code(lat, lng):
    if lat is None or lng is None:  # Check for NoneType values
        return None
    try:
        geolocator = Nominatim(user_agent="geoapi")
        location = geolocator.reverse((float(lat), float(lng)), timeout=10)
        if location and 'postcode' in location.raw['address']:
            return location.raw['address']['postcode']
        return None
    except Exception:
        return None

# Filter rows where lat or long is null (optional but recommended)
df_cleaned = df_cleaned.filter((col("lat").isNotNull()) & (col("long").isNotNull()))
# Apply the UDF to add the zip_code column
df_cleaned = df_cleaned.withColumn("zip_code", get_zip_code(col("lat"), col("long")))
df_cleaned = df_cleaned.withColumn("zip_code", col("zip_code").cast("int"))
# Drop the lat and long columns
df_cleaned = df_cleaned.drop("lat", "long")
# Show the resulting DataFrame
log_show_output(spark_df=df_cleaned.select("zip_code"), title="Top 5 Rows extracting the zipcode from lat long - FEATURE ENGINEERING")

INFO:root:=== Top 5 Rows extracting the zipcode from lat long - FEATURE ENGINEERING ===
INFO:root:+------------+
|   zip_code |
|      33604 |
+------------+
|      99207 |
+------------+
|      99207 |
+------------+
|      45215 |
+------------+
|       2914 |
+------------+


In [15]:
from pyspark.sql import functions as F
# Concatenate 'manufacturer' and 'model' into a new column 'car_name'
df_cleaned = df_cleaned.withColumn(
    "car_name", F.concat(F.col("manufacturer"), F.lit(" "), F.col("model"))
)
df_cleaned = df_cleaned.drop('model')
# Show the resulting DataFrame with the new 'car_name' column
log_show_output(spark_df=df_cleaned.select("car_name"), title="Top 5 Rows with car_name column")

INFO:root:=== Top 5 Rows with car_name column ===
INFO:root:+--------------------+
| car_name           |
| nissan juke        |
+--------------------+
| subaru impreza wrx |
+--------------------+
| ford f-150         |
+--------------------+
| gmc sierra         |
+--------------------+
| ram 1500 big horn  |
+--------------------+


In [16]:
log_message(message=df_cleaned.printSchema())

root
 |-- price: float (nullable = true)
 |-- year: integer (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- odometer: float (nullable = true)
 |-- transmission: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- state: string (nullable = true)
 |-- posting_date: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- car_name: string (nullable = true)



In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.sql import functions as F

# Create a Tokenizer
tokenizer = Tokenizer(inputCol="car_name", outputCol="car_name_tokens")

# Create a CountVectorizer
cv = CountVectorizer(inputCol="car_name_tokens", outputCol="car_name_vec", vocabSize=1000, minDF=2.0)

# Create a pipeline
pipeline = Pipeline(stages=[tokenizer, cv])

# Fit and transform in one step
df_vectorized = pipeline.fit(df_cleaned).transform(df_cleaned)

# Select only necessary columns
df_cleaned = df_vectorized.select("*")
df_cleaned = df_cleaned.drop('car_name', 'car_name_tokens')
print(df_cleaned.columns)
# Show the result
log_show_output(spark_df=df_cleaned.select("car_name_vec"), title="Top 5 Rows with car_name vector - FEATURE ENGINEERING")


INFO:root:=== Top 5 Rows with car_name vector - FEATURE ENGINEERING ===


['price', 'year', 'manufacturer', 'odometer', 'transmission', 'type', 'paint_color', 'state', 'posting_date', 'zip_code', 'car_name_vec']


INFO:root:+----------------------------------------+
| car_name_vec                           |
| (996,[8,338],[1.0,1.0])                |
+----------------------------------------+
| (996,[20,160,190],[1.0,1.0,1.0])       |
+----------------------------------------+
| (996,[0,22],[1.0,1.0])                 |
+----------------------------------------+
| (996,[12,21],[1.0,1.0])                |
+----------------------------------------+
| (996,[4,13,207,242],[1.0,1.0,1.0,1.0]) |
+----------------------------------------+


In [18]:
df_cleaned.columns

['price',
 'year',
 'manufacturer',
 'odometer',
 'transmission',
 'type',
 'paint_color',
 'state',
 'posting_date',
 'zip_code',
 'car_name_vec']

In [19]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

# List of columns for StringIndexer
columns_to_index = ['transmission', 'paint_color', 'state', 'manufacturer']

# Create StringIndexer stages for all columns
indexer_stages = [StringIndexer(inputCol=col, outputCol=col + "_indexed", handleInvalid="keep") for col in columns_to_index]

# Create a pipeline with all StringIndexer stages
pipeline = Pipeline(stages=indexer_stages)

# Fit the pipeline and transform the DataFrame
model = pipeline.fit(df_cleaned)
df_cleaned = model.transform(df_cleaned)

# Output the columns of the transformed DataFrame
log_message(message=df_cleaned.columns)

INFO:root:['price', 'year', 'manufacturer', 'odometer', 'transmission', 'type', 'paint_color', 'state', 'posting_date', 'zip_code', 'car_name_vec', 'transmission_indexed', 'paint_color_indexed', 'state_indexed', 'manufacturer_indexed']


In [20]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml.linalg import Vector

# Define a UDF to sum the vector values
def sum_vector(vec):
    if vec is not None:
        return float(sum(vec.values))  # Sum the values in the sparse vector
    return 0.0

# Register the UDF
sum_vector_udf = udf(sum_vector, DoubleType())

# Apply the UDF to create a new column with the sum of the vector values
final_df = df_cleaned.withColumn("car_name_sum", sum_vector_udf("car_name_vec"))
final_df = final_df.drop('car_name_vec')
# Show the resulting DataFrame with the new numeric column
log_show_output(spark_df=final_df.select("car_name_sum"), title="Top 5 Rows with car_name sum - FEATURE ENGINEERING")


INFO:root:=== Top 5 Rows with car_name sum - FEATURE ENGINEERING ===
INFO:root:+----------------+
|   car_name_sum |
|              2 |
+----------------+
|              3 |
+----------------+
|              2 |
+----------------+
|              2 |
+----------------+
|              4 |
+----------------+


In [21]:
row_count = final_df.count()  # Number of rows
column_count = len(final_df.columns)  # Number of columns
log_message(message=f"Shape of the DataFrame after handling nulls and stringtypes: ({row_count}, {column_count})")

INFO:root:Shape of the DataFrame after handling nulls and stringtypes: (7244, 15)


In [22]:
log_message(message=final_df.columns)

INFO:root:['price', 'year', 'manufacturer', 'odometer', 'transmission', 'type', 'paint_color', 'state', 'posting_date', 'zip_code', 'transmission_indexed', 'paint_color_indexed', 'state_indexed', 'manufacturer_indexed', 'car_name_sum']


In [None]:
numeric_inputs = ['odometer','car_name_sum']
# Create a dictionary to store the quantiles (1st and 99th percentiles)
d = {}
for col_name in numeric_inputs:
    # Calculate the 1st and 99th percentiles for each column
    d[col_name] = final_df.approxQuantile(col_name, [0.01, 0.99], 0.25)

# Handle skewness and outliers
for col_name in numeric_inputs:
    # Calculate skewness for each numeric column
    skew = final_df.agg(skewness(col(col_name))).collect()[0][0]

    # Clip values to handle outliers (below 1st quantile and above 99th quantile)
    clipped_col = when(final_df[col_name] < d[col_name][0], d[col_name][0])\
                  .when(final_df[col_name] > d[col_name][1], d[col_name][1])\
                  .otherwise(final_df[col_name])

    # If skewness is positive (right skew), apply log transformation
    if skew > 1:
        df = final_df.withColumn(col_name, log(clipped_col + 1).alias(col_name))  # Apply log transformation
        print(f"{col_name} has been treated for positive (right) skewness. (skew = {skew})")

    # If skewness is negative (left skew), apply exponential transformation
    elif skew < -1:
        df = final_df.withColumn(col_name, exp(clipped_col).alias(col_name))  # Apply exponential transformation
        print(f"{col_name} has been treated for negative (left) skewness. (skew = {skew})")

df.show()
# Write the transformed DataFrame to a new CSV file
df.write.csv("/content/transformed_data.csv", header=True, mode="overwrite")
print("Transformed data saved to transformed_data.csv")
spark.stop()

In [23]:
from pyspark.sql import functions as F

# Open a file to write the results
with open('/content/aggregations_results.txt', 'w') as file:

    # Select the first 1000 rows from the final DataFrame
    df_subset = final_df.limit(1000)

    # Select required columns for further analysis
    required_columns = [
        "price", "manufacturer", "odometer", "paint_color", "state", "zip_code"
    ]

    # Filter the dataframe to include only the necessary columns and manufacturers of interest
    filtered_df = df_subset.select(*required_columns) \
        .filter(F.col("manufacturer").isin("audi", "honda", "bmw"))

    # Aggregation 1: Average Price by Paint Color
    file.write("Aggregation 1: Average Price by Paint Color\n")
    paint_color_avg_price_agg = filtered_df.groupBy("paint_color") \
        .agg(F.avg("price").alias("avg_price"))
    paint_color_avg_price_agg.show(truncate=False)
    paint_color_avg_price_agg.collect()  # Collecting results for writing
    file.write(str(paint_color_avg_price_agg.collect()) + '\n\n')  # Write to file

    # Aggregation 2: Min and Max Price by Manufacturer
    file.write("Aggregation 2: Min and Max Price by Manufacturer\n")
    manufacturer_min_max_price_agg = filtered_df.groupBy("manufacturer") \
        .agg(
            F.max("price").alias("max_price")
        )
    manufacturer_min_max_price_agg.show(truncate=False)
    manufacturer_min_max_price_agg.collect()  # Collecting results for writing
    file.write(str(manufacturer_min_max_price_agg.collect()) + '\n\n')  # Write to file

    # Aggregation 3: Average Odometer by Manufacturer
    file.write("Aggregation 3: Average Odometer by Manufacturer\n")
    odometer_avg_agg = filtered_df.groupBy("manufacturer") \
        .agg(F.avg("odometer").alias("avg_odometer"))
    odometer_avg_agg.show(truncate=False)
    odometer_avg_agg.collect()  # Collecting results for writing
    file.write(str(odometer_avg_agg.collect()) + '\n\n')  # Write to file

    # Aggregation 4: Count of Cars by State with Zip Code reference
    file.write("Aggregation 4: Count of Cars by State with Zip Code reference\n")
    state_car_count_agg = filtered_df.groupBy("state") \
        .agg(
            F.count("*").alias("car_count"),
            F.first("zip_code").alias("zip_code")  # Fetch a zip code per state
        ) \
        .orderBy(F.desc("car_count"))
    state_car_count_agg.show()
    state_car_count_agg.collect()  # Collecting results for writing
    file.write(str(state_car_count_agg.collect()) + '\n\n')  # Write to file

    # Aggregation 5: Count of Cars by Paint Color
    file.write("Aggregation 5: Count of Cars by Paint Color\n")
    paint_color_count_agg = filtered_df.groupBy("paint_color") \
        .agg(F.count("*").alias("car_count"))
    paint_color_count_agg.show(truncate=False)
    paint_color_count_agg.collect()  # Collecting results for writing
    file.write(str(paint_color_count_agg.collect()) + '\n\n')  # Write to file

print("Aggregations have been written to 'aggregations_results.txt'.")


+-----------+------------------+
|paint_color|avg_price         |
+-----------+------------------+
|grey       |14003.5           |
|black      |19922.235294117647|
|silver     |16519.619047619046|
|white      |19797.58823529412 |
|blue       |17254.260869565216|
|red        |20205.444444444445|
|green      |8627.0            |
|brown      |9080.0            |
+-----------+------------------+

+------------+---------+
|manufacturer|max_price|
+------------+---------+
|audi        |48000.0  |
|bmw         |52590.0  |
|honda       |39950.0  |
+------------+---------+

+------------+-----------------+
|manufacturer|avg_odometer     |
+------------+-----------------+
|audi        |67914.34782608696|
|bmw         |72961.41025641025|
|honda       |113351.25        |
+------------+-----------------+

+-----+---------+--------+
|state|car_count|zip_code|
+-----+---------+--------+
|   ca|       16|   93117|
|   fl|       10|   34104|
|   oh|        8|   43229|
|   wi|        7|   54703|
|   tx