In [0]:
import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType


In [0]:
# 示例URL：用真实的Trade Me API Key替换后使用（这里先用假URL测试结构）
sample_url = "https://api.trademe.co.nz/v1/Search/Property/Residential.json"

response = requests.get(sample_url)

# 检查状态码和返回结构
print("Status Code:", response.status_code)
print("Sample JSON Keys:", response.json().keys())

Status Code: 401
Sample JSON Keys: dict_keys(['Request', 'ErrorDescription'])


In [0]:
spark = SparkSession.builder.getOrCreate()

sample_data = [("123 ABC St", "Auckland", 950000, 3, 2),
               ("456 XYZ Ave", "Wellington", 870000, 2, 1)]

schema = StructType([
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("price", IntegerType(), True),
    StructField("bedrooms", IntegerType(), True),
    StructField("bathrooms", IntegerType(), True)
])

df = spark.createDataFrame(sample_data, schema)
df.show()

+-----------+----------+------+--------+---------+
|    address|      city| price|bedrooms|bathrooms|
+-----------+----------+------+--------+---------+
| 123 ABC St|  Auckland|950000|       3|        2|
|456 XYZ Ave|Wellington|870000|       2|        1|
+-----------+----------+------+--------+---------+



In [0]:
%pip install requests_oauthlib

Python interpreter will be restarted.
Collecting requests_oauthlib
  Downloading requests_oauthlib-2.0.0-py2.py3-none-any.whl (24 kB)
Collecting oauthlib>=3.0.0
  Downloading oauthlib-3.2.2-py3-none-any.whl (151 kB)
Installing collected packages: oauthlib, requests-oauthlib
Successfully installed oauthlib-3.2.2 requests-oauthlib-2.0.0
Python interpreter will be restarted.


In [0]:
from requests_oauthlib import OAuth1Session
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp

In [0]:
CONSUMER_KEY = "104CED1C9D5DCA49008D5D9AB8E22103"
CONSUMER_SECRET = "6C5157AA1E5DB0FC4CC679077F4D50C1"
OAUTH_TOKEN = "54842151D01EE95430C5909D857D02F7"
OAUTH_TOKEN_SECRET = "832A352822810F8D2E8C33D8F7C8F4D0"

tm = OAuth1Session(
    CONSUMER_KEY,
    client_secret=CONSUMER_SECRET,
    resource_owner_key=OAUTH_TOKEN,
    resource_owner_secret=OAUTH_TOKEN_SECRET
)


In [0]:
url = "https://api.tmsandbox.co.nz/v1/Search/Property/Residential.json"
params = {
    "region": 1,  # Auckland
    "rows": 50,
    "page": 1
}

response = tm.get(url, params=params)
data = response.json()


In [0]:
listings = data.get('List', [])

# 提取字段
records = []
for listing in listings:
    records.append({
        "listing_id": listing.get("ListingId"),
        "title": listing.get("Title"),
        "price_display": listing.get("PriceDisplay"),
        "suburb": listing.get("Suburb"),
        "region": listing.get("Region"),
        "bedrooms": listing.get("Bedrooms"),
        "bathrooms": listing.get("Bathrooms"),
        "land_area": listing.get("LandArea"),
        "listing_date": listing.get("ListingGroup")
    })

# 创建DataFrame
df = spark.createDataFrame(records)
df.show(5)


+---------+--------+---------+------------+----------+--------------------+--------+---------+--------------------+
|bathrooms|bedrooms|land_area|listing_date|listing_id|       price_display|  region|   suburb|               title|
+---------+--------+---------+------------+----------+--------------------+--------+---------+--------------------+
|        2|       3|     1517|    PROPERTY|2149706648|Asking price $1,4...|Auckland| Pukekohe|XUVI9GRMVB7J09R7KIIO|
|     null|    null|     8600|    PROPERTY|2149706647|Price by negotiation|Auckland|   Karaka|NZEHGJFBF76UKR7CTNDU|
|        3|       3|     null|    PROPERTY|2149706646|Price by negotiation|Auckland|Greenlane|RKWRAECZX6TRZNL0EGJJ|
|        4|       5|      555|    PROPERTY|2149706645|Price by negotiation|Auckland|   Karaka|A07H3KAHGXNOG74P3DHR|
|        2|       4|     null|    PROPERTY|2149706644|Auction on 29 May...|Auckland|Hillcrest|J4MXOP1AWQBTLRW4EPV2|
+---------+--------+---------+------------+----------+------------------

In [0]:
df_cleaned = df.withColumn("bedrooms", col("bedrooms").cast("string")) \
               .withColumn("bathrooms", col("bathrooms").cast("string")) \
               .withColumn("land_area", col("land_area").cast("double"))

df_cleaned.printSchema()
df_cleaned.show()


root
 |-- bathrooms: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- land_area: double (nullable = true)
 |-- listing_date: string (nullable = true)
 |-- listing_id: long (nullable = true)
 |-- price_display: string (nullable = true)
 |-- region: string (nullable = true)
 |-- suburb: string (nullable = true)
 |-- title: string (nullable = true)

+---------+--------+---------+------------+----------+--------------------+--------+-----------+--------------------+
|bathrooms|bedrooms|land_area|listing_date|listing_id|       price_display|  region|     suburb|               title|
+---------+--------+---------+------------+----------+--------------------+--------+-----------+--------------------+
|        2|       3|   1517.0|    PROPERTY|2149706648|Asking price $1,4...|Auckland|   Pukekohe|XUVI9GRMVB7J09R7KIIO|
|     null|    null|   8600.0|    PROPERTY|2149706647|Price by negotiation|Auckland|     Karaka|NZEHGJFBF76UKR7CTNDU|
|        3|       3|     null|    PROPER

In [0]:
df_cleaned = df.selectExpr(
    "cast(listing_id as string)",
    "cast(title as string)",
    "cast(price_display as string)",
    "cast(suburb as string)",
    "cast(region as string)",
    "cast(bedrooms as string)",
    "cast(bathrooms as string)",
    "cast(land_area as string)",      # 暂时也转 string 测试
    "cast(listing_date as string)"
)


In [0]:
sfOptions = {
  "sfURL": "kcieedd-kp84242.snowflakecomputing.com",  # 不要加 https://，如 abcde-xy12345.snowflakecomputing.com
  "sfUser": "mjsnowflake",           # 注册 Snowflake 时填写的
  "sfPassword": "Snowflake850525!",         # 登录密码
  "sfDatabase": "PROPERTY_ANALYTICS",
  "sfSchema": "RAW",
  "sfWarehouse": "COMPUTE_WH",      # 默认就叫这个，如果你没改过
  "sfRole": "ACCOUNTADMIN"          # 如果你是初始用户，默认角色是这个
}


In [0]:
from pyspark.sql.functions import current_timestamp

df_final = df_cleaned.withColumn("inserted_at", current_timestamp())


In [0]:
df_final.write \
  .format("snowflake") \
  .options(**sfOptions) \
  .option("dbtable", "TRADEME_PROPERTY_LISTINGS") \
  .mode("append") \
  .save()
