Skip to content

Commit

Permalink
Add Socrata Data Nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
UrbanGISer committed Nov 14, 2023
1 parent 3fbba3d commit 3cb837e
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 0 deletions.
1 change: 1 addition & 0 deletions knime_extension/geospatial_env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ dependencies:
- pointpats=2.3.0
- pip:
- ipinfo==4.4.3
- sodapy==2.2.0
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
157 changes: 157 additions & 0 deletions knime_extension/src/nodes/opendata.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,3 +686,160 @@ def execute(self, exec_context: knext.ExecutionContext):
gdf = get_osmnx().geocode_to_gdf(self.placename)
gdf = gdf.reset_index(drop=True)
return knext.Table.from_pandas(gdf)


############################################
# Socrata Search
############################################
@knext.node(
name="Socrata Search",
node_type=knext.NodeType.SOURCE,
icon_path=__NODE_ICON_PATH + "Socrata Search.png",
category=__category,
after="",
)
@knext.output_table(
name="Socrata dataset list",
description="Socrata dataset based on search keywords",
)
class SocrataSearchNode:
"""Retrive the open data category via Socrata API.
The Socrata Open Data API (SODA) is a powerful tool designed for programmatically accessing a vast array of open data resources from various organizations around the world, including governments, non-profits,and NGOs..
This node uses the [SODA Consumer API](https://dev.socrata.com/consumers/getting-started.html) to get the dataset list.
"""

queryitem = knext.StringParameter(
label="Input searching item",
description="""Enter search keywords or dataset names to find relevant datasets in the Socrata database.
This search is not case-sensitive and can include multiple words separated by spaces. """,
default_value="Massachusetts",
)

def configure(self, configure_context):
# TODO Create combined schema
return None

def execute(self, exec_context: knext.ExecutionContext):
from urllib.request import Request, urlopen
import pandas as pd
import json
from pandas import json_normalize

query_item = self.queryitem
request = Request(
f"http://api.us.socrata.com/api/catalog/v1?q={query_item}&only=datasets&limit=10000"
)

response = urlopen(request)
response_body = response.read()

# Load the JSON response into a Python dictionary
data = json.loads(response_body)

# Extract the "results" key, which contains the dataset information
dataset_info = data["results"]

# Create a DataFrame from the dataset information, and flatten the nested dictionaries
df = json_normalize(dataset_info)
df = df.drop(
columns=["classification.domain_tags", "classification.domain_metadata"]
)

# Find List
list_columns = [
col for col in df.columns if any(isinstance(item, list) for item in df[col])
]

# Drop error list column
for col in list_columns:
try:
df[col] = df[col].apply(
lambda x: ", ".join(x) if isinstance(x, list) else x
)
except Exception as e:
df.drop(columns=[col], inplace=True)

# Drop columns that cannot be saved in KNIME
drop_columns = []
for col in df.columns:
try:
# Attempt to convert the column to a KNIME-compatible data type
knime_table = knext.Table.from_pandas(df[[col]])
except Exception as e:
# If an exception is raised, add the column to the list of columns to drop
drop_columns.append(col)

# Drop the columns that cannot be saved in KNIME
df.drop(columns=drop_columns, inplace=True)
df.replace("?", pd.NA, inplace=True)
df.replace("", pd.NA, inplace=True)
df.dropna(axis=1, how="all", inplace=True)
df = df.reset_index(drop=True)
return knext.Table.from_pandas(df)


############################################
# Socrata Data Query
############################################
@knext.node(
name="Socrata Data Query",
node_type=knext.NodeType.SOURCE,
icon_path=__NODE_ICON_PATH + "Socrata Data Query.png",
category=__category,
after="",
)
@knext.output_table(
name="Socrata dataset",
description="Socrata dataset based on search keywords",
)
class SocrataDataNode:
"""Retrive the open data category via Socrata API.
The Socrata Open Data API (SODA) is a powerful tool designed for programmatically accessing a vast array of open data resources from various organizations around the world, including governments, non-profits,and NGOs..
This node uses the [SODA Consumer API](https://dev.socrata.com/consumers/getting-started.html) to get the dataset from a dataset list generated by Socrata Search Node.
For instance, this dataset [Incidence Rate Of Breast Cancer](https://opendata.utah.gov/Health/Incidence-Rate-Of-Breast-Cancer-Per-100-000-All-St/q22t-rbk9) has a resource_id of "q22t-rbk9" and a metadata domain of "opendata.utah.gov".
They can be found in the link under API,"https://opendata.utah.gov/resource/q22t-rbk9.json". Both the two items will be used for data retriving.
"""

metadata_domain = knext.StringParameter(
label="Metadata domain",
description="""The value in the column metadata.domain of a table generated by a Socrata Search node. """,
default_value="",
)

resource_id = knext.StringParameter(
label="Resource ID",
description="""The value in the column resource.id of a table generated by a Socrata Search node. """,
default_value="",
)

def configure(self, configure_context):
# TODO Create combined schema
return None

def execute(self, exec_context: knext.ExecutionContext):
import pandas as pd
import json
import pandas as pd
from sodapy import Socrata

# Unauthenticated client only works with public data sets. Note 'None'
# in place of application token, and no username or password:
client = Socrata(self.metadata_domain, None)

# Example authenticated client (needed for non-public datasets):
# client = Socrata(data.cdc.gov,
# MyAppToken,
# username="user@example.com",
# password="AFakePassword")

# First 2000 results, returned as JSON from API / converted to Python list of
# dictionaries by sodapy.
results = client.get(self.resource_id, limit=100000)

# Convert to pandas DataFrame
results_df = pd.DataFrame.from_records(results)

return knext.Table.from_pandas(results_df)

0 comments on commit 3cb837e

Please sign in to comment.