In [0]:
from datetime import datetime

import pyspark.sql.functions as F
import pyspark.sql.types as T

In [0]:
CATALOG = "marcell"
SCHEMA = "marine_planning"
SOURCE_TABLE = "project_pages"

In [0]:
time_now = datetime.now()
current_user = spark.sql("SELECT current_user()").collect()[0][0]
RUN_ID = f"{time_now.strftime('%Y%m%d%H%M%S')}_{current_user}"

In [0]:
prompt = """
Role: Act as a senior environmental data analyst with expertise in parsing technical policy documents.

Task: Analyze the provided policy paper page and extract verified quantitative impacts on species/conservation areas using this structure:

{
  'impacts': [
    {
      'species': 'common name of the animal species impacted',
      'impact': 'the adverse effect identified on the species, e.g. collision mortality',
      'impact_type': 'whether the impact is due to this project alone (independent), or in combination with other projects (in-combination)',
      'impact_category': 'one of: Collision, Displacement/Disturbance, Mortality, Population/Breeding, Noise, Other',
      'area': 'Official designated conservation area name',
      'reporting_organisation': 'Full organization name (no abbreviations), e.g. Natural England (NE) or the Secretary of State (SoS)',
      'value': 'Exact numerical figure from source, if a single value is mentioned',
      'minimum_value': 'The minimum value, if a range is presented',
      'maximum_value': 'The maximum value, if a range is presented',
      'unit': 'Measurement unit (e.g., individuals, % population)',
      'confidence': 'Confidence level (confirmed, suspected, confirmed beyond any scientific doubt)',
      'quote': 'Direct quote from the text referencing the impact'
    }
  ]
}

Rules:

1. Include ONLY impacts with quantitative values explicitly stated in the text

2. Verify organization names against official registers

3. Use original measurement units - convert percentages only if explicitly stated

4. Omit impacts described with speculative language ('may affect', 'potentially')

5. Return empty array if no verifiable impacts exist, without returning any other text

Format Requirements:

- Strictly valid JSON (no trailing commas, exact quotes)

- No Markdown formatting

- the JSON should be machine readable, so DON'T use the ```json markdown format

- Preserve case sensitivity in names

- Empty fields permitted - if information for a field is missing, keep the field as an empty string

Process:

1. Identify all species-conservation area pairs

2. Cross-reference with reporting organizations

3. Extract exact numerical values + context units

4. Validate against document's explicit statements.

Here is the page:

"""

model = "databricks-claude-3-7-sonnet"

In [0]:
query = f'''
SELECT *,
ai_query("{model}", "{prompt}" || page_text) as ai_result
FROM {CATALOG}.{SCHEMA}.{SOURCE_TABLE}
'''

df = spark.sql(query)

In [0]:
import pyspark.sql.functions as F

In [0]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

schema = StructType([
    StructField("impacts", ArrayType(StructType([
        StructField("species", StringType(), True),
        StructField("impact", StringType(), True),
        StructField("impact_type", StringType(), True),
        StructField("impact_category", StringType(), True),
        StructField("area", StringType(), True),
        StructField("reporting_organisation", StringType(), True),
        StructField("value", StringType(), True),
        StructField("minimum_value", StringType(), True),
        StructField("maximum_value", StringType(), True),
        StructField("unit", StringType(), True),
        StructField("confidence", StringType(), True),
        StructField("quote", StringType(), True)
    ])), True)
])

parsed_df = df.withColumn("ai_result_parsed", from_json(col("ai_result"), schema))
parsed_df = parsed_df.withColumn("impacts", col("ai_result_parsed.impacts"))
parsed_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{CATALOG}.{SCHEMA}.project_impact")

In [0]:
parsed_df = spark.table(f"{CATALOG}.{SCHEMA}.project_impact").filter(col("impacts").isNotNull() & (col("impacts").getItem(0).isNotNull()))
from pyspark.sql.functions import explode

exploded_df = parsed_df.select("*", explode(col("impacts")).alias("impact"))


In [0]:
exploded_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{CATALOG}.{SCHEMA}.project_impact_expl")

In [0]:
df = (spark.table("marcell.marine_planning.project_impact_expl")
      .withColumn("species", F.col("impact.species"))
      .withColumn("cause", F.col("impact.impact"))
      .withColumn("impact_type", F.col("impact.impact_type"))
      .withColumn("impact_category", F.col("impact.impact_category"))
      .withColumn("area", F.col("impact.area"))
      .withColumn("reporting_organisation", F.col("impact.reporting_organisation"))
      .withColumn("value", F.col("impact.value"))
      .withColumn("minimum_value", F.col("impact.minimum_value"))
      .withColumn("maximum_value", F.col("impact.maximum_value"))
      .withColumn("unit", F.col("impact.unit"))
      .withColumn("confidence", F.col("impact.confidence"))
      .withColumn("quote", F.col("impact.quote"))
      .drop("impact", "impacts", "ai_result_parsed", "ai_result")
)
df.display()

In [0]:
df = (df
      .filter(~(F.col("value").isNull() & F.col("minimum_value").isNull() & F.col("maximum_value").isNull()))
      .filter(~((F.col("value")==F.lit("")) & (F.col("minimum_value")==F.lit("")) & (F.col("maximum_value")==F.lit(""))))
      .filter(F.col("cause").isNotNull())
      .withColumn("quote_validation", F.expr("INSTR(page_text, quote) > 0"))
      )

In [0]:
df = df\
  .filter("reporting_organisation ILIKE '%secretary of state%' OR reporting_organisation LIKE '%SoS%' OR reporting_organisation ILIKE '%applicant%'")\
  .filter("impact_type <> 'in-combination'")\
  .filter("unit NOT ILIKE ('%km%')")\
  .filter("unit <> 'ha'")\
  .filter("confidence = 'confirmed'")

In [0]:
@F.udf(T.StringType())
def format_value(value, min_value, max_value, unit):
  if min_value is None or max_value is None or min_value=="" or max_value=="":
    return f"{value} {unit}"
  
  if value is None or value=="":
    return f"{min_value}-{max_value} {unit}"
  
  return f"{value} ({min_value}-{max_value}) {unit}"

In [0]:
df = df.withColumn("formatted_value", format_value(F.col("value"), F.col("minimum_value"), F.col("maximum_value"), F.col("unit")))

In [0]:
df.display()

In [0]:
@F.udf(T.StringType())
def create_json(row):
  return F"""
species: {row.species}
impact: {row.cause}
value: {row.formatted_value}
impact_category: {row.impact_category}
area: {row.area}
reporting_organisation: {row.reporting_organisation}
quote: {row.quote}
page_number: {row.page_number}
"""

In [0]:
df = df\
  .withColumn("fact_json", create_json(F.struct("species", "cause", "formatted_value", "impact_category", "area", "reporting_organisation", "quote", "page_number")))
df.display()

In [0]:
df_facts = df.groupBy("project").agg(F.collect_list("fact_json").alias("facts")).select("project", F.concat_ws("\n\n----------------\n\n", "facts").alias("concatenated_facts"))

In [0]:
df_facts.display()

In [0]:
prompt = """
Analyze the provided JSON dataset of marine impact facts from infrastructure planning documents. Generate a structured markdown report that:

1. **Groups impacts by conservation area** (primary sections)
2. **Prioritizes by highest quantitative value** within each area
3. **Highlights 3 key impact categories** (noise, habitat loss, pollution, etc.)
4. **Maintains traceability** using [p. X] citations from source documents

Include for each conservation area:
- ## Area Name 
  - ### Most Severe Impact Category
    - **Species**: Brief impact description (Value: _quantitative data_) [p. X]
    - Direct quote from report: '...' [p. X]
    - Reporting organization: _Name_
  - Comparative impact table showing:
    | Species | Impact Type | Quantitative Value | Page Ref |
    |---------|-------------|--------------------|----------|
    | _Data_  | _Data_      | _Data_             | _[p.X]_  |

Formatting rules:
- Bold category headers
- Italicize quantitative values
- Use > for direct quotes
- Sort tables by descending value
- List max 5 species per category
- Highlight duplicate impacts across areas with ⚠️

Start with '## Major Marine Impacts Summary' header. Conclude with '## Key Takeaways' listing top 3 cross-area concerns with total affected species counts.

Example output:
## Major Marine Impacts Summary

## Bristol Channel
### Noise Pollution (Highest Impact)
- **Harbor Porpoise**: 58% reduction in feeding activity (Value: _1200 dB increase_) [p. 42]
> 'Construction pile-driving will create sustained underwater noise exceeding 160dB' [p. 42]
- Reporting: Marine Conservation Society

| Species | Impact Type | Quantitative Value | Page Ref |
|---------|-------------|--------------------|----------|
| Porpoise | Hearing loss | 1200 dB | [p.42] |
| Cod | Larval mortality | 45% | [p.38] |

## Key Takeaways
1. Noise pollution affects 8 species across 3 regions 
2. Habitat fragmentation risks 12 endangered species
3. 75% of impacts occur in EU-protected zones


Here are the facts:


"""

In [0]:
df_facts = df_facts.withColumn("ai_summary", F.expr(f'ai_query("{model}", "{prompt}" || concatenated_facts)'))

In [0]:
# df_facts.display()

In [0]:
pdf_facts = df_facts.toPandas()

def save_to_md(project, text):
  with open(f"{project}_summary_claude.md", "w") as f:
    f.write(text)

pdf_facts.apply(lambda x: save_to_md(x["project"], x["ai_summary"]), axis=1)