In [3]:
from sympy import symbols, Eq, solve

# 定义变量
P, Q = symbols('P Q')

# 组 A 和 组 B 的需求方程
Q_A = (677 - P) / 0.4  # 组 A 的需求
Q_B = (723 - P) / 0.1  # 组 B 的需求

# 总需求为 Q_A 和 Q_B 之和
total_demand_eq = Eq(Q, Q_A + Q_B)
# total_demand = Eq(Q, (3569 - 5*P) / 0.4)


# 供给方程
supply_eq = Eq(P, 338.5 + 0.4 * Q)

# 解联立方程，求出均衡价格 P 和均衡数量 Q
equilibrium_solution = solve([supply_eq, total_demand_eq], (P, Q))

# 输出均衡价格和均衡数量
equilibrium_solution


{P: 651.250000000000, Q: 781.875000000000}

In [1]:
import pandas as pd

# 使用 chunksize 参数读取前10行
chunksize = 10  # 每次读取 10 行
chunk = pd.read_csv(r'C:\Users\lishe\Downloads\gwfziq8ptfx6csqt.csv', chunksize=chunksize)

# 获取第一个块，也就是文件的前10行
df_first_10 = next(chunk)

# 显示前10行
print(df_first_10)


  SYMBOL        DATE     TIME    BID    OFR  BIDSIZ  OFRSIZ  MODE EX  MMID
0      A  2011-10-03  4:00:00  29.49  33.50       1       1    12  P   NaN
1      A  2011-10-03  6:01:59   0.00   0.00       0       0    12  M   NaN
2      A  2011-10-03  6:47:36  30.65  33.50       1       1    12  P   NaN
3      A  2011-10-03  6:47:36  30.65  31.25       1       1    12  P   NaN
4      A  2011-10-03  6:49:04  30.66  31.25       4       1    12  P   NaN
5      A  2011-10-03  7:04:23  30.65  31.25       1       1    12  P   NaN
6      A  2011-10-03  7:04:45  30.66  31.25       4       1    12  P   NaN
7      A  2011-10-03  7:35:00  25.29   0.00       1       0    12  T   NaN
8      A  2011-10-03  7:35:00  25.29  36.72       1       1    12  T   NaN
9      A  2011-10-03  7:44:19  30.66  31.25       5       1    12  P   NaN


In [3]:
import pandas as pd
import zipfile

# 打开 ZIP 压缩包
with zipfile.ZipFile(r'C:\Users\lishe\Downloads\pe8pqmpquql8iaiz_csv.zip') as z:
    # 打开压缩包内的特定 CSV 文件
    with z.open('pe8pqmpquql8iaiz.csv') as f:
        # 只读取前 10 行
        df = pd.read_csv(f, nrows=10)

# 打印前 10 行数据
print(df)


  SYMBOL        DATE     TIME    BID    OFR  BIDSIZ  OFRSIZ  MODE EX  MMID
0      A  2011-09-01  4:00:00  32.25   0.00       2       0    12  P   NaN
1      A  2011-09-01  5:48:53  35.87   0.00       1       0    12  P   NaN
2      A  2011-09-01  5:48:55  35.87  36.79       1       1    12  P   NaN
3      A  2011-09-01  5:50:10  35.88  36.79       4       1    12  P   NaN
4      A  2011-09-01  5:50:59  35.88   0.00       4       0    12  P   NaN
5      A  2011-09-01  5:51:01  32.25   0.00       2       0    12  P   NaN
6      A  2011-09-01  5:56:11  35.87  36.79       1       1    12  P   NaN
7      A  2011-09-01  5:58:51  35.88  36.79       4       1    12  P   NaN
8      A  2011-09-01  6:01:59   0.00   0.00       0       0    12  M   NaN
9      A  2011-09-01  7:30:01   0.00  47.40       0       2    12  T   NaN


In [7]:
import os
import shutil
import zipfile
import tempfile
import logging
import warnings
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, count
from pyspark.sql.types import FloatType, StringType, DateType

# Suppress unnecessary warnings
warnings.filterwarnings("ignore")
logging.getLogger("distributed.shuffle").setLevel(logging.ERROR)

# 创建SparkSession
spark = SparkSession.builder \
    .appName("Process Zip CSV with PySpark") \
    .master("local[*]") \
    .config("spark.hadoop.fs.permissions.enabled", "false") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

def process_zip_file(zip_file_path, output_folder):
    try:
        # Specify temporary directory location
        temp_dir = r'X:\Transfer'

        # Create a unique temporary directory for extraction
        with tempfile.TemporaryDirectory(dir=temp_dir) as temp_subdir:
            # Extract ZIP file to temporary subdirectory
            with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
                zip_ref.extractall(temp_subdir)
                # Assume there is only one CSV file in the ZIP
                csv_file_name = zip_ref.namelist()[0]
                csv_file_path = os.path.join(temp_subdir, csv_file_name)

            # Read CSV into PySpark DataFrame
            df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

            # Filter out invalid BID and OFR values
            df = df.filter((col("BID") > 0) & (col("OFR") > 0))

            # Calculate Spread and Midpoint
            df = df.withColumn('Spread', col('OFR') - col('BID'))
            df = df.filter(col('Spread') <= 5)  # Filter out large spreads

            df = df.withColumn('Midpoint', (col('BID') + col('OFR')) / 2)
            df = df.withColumn('Relative_Quoted_Spread', (col('Spread') / col('Midpoint')) * 100)

            # Convert DATE to date format
            df = df.withColumn('DATE', col('DATE').cast(DateType()))

            # Group by SYMBOL and DATE to compute daily equal-weighted averages
            result = df.groupBy('SYMBOL', 'DATE').agg(
                mean('Relative_Quoted_Spread').alias('Relative_Quoted_Spread_mean'),
                count('SYMBOL').alias('Quote_count')
            )

            # Save the result as CSV
            zip_file_name = os.path.basename(zip_file_path)
            output_file_name = os.path.splitext(zip_file_name)[0] + '_daily_average_quoted_spread.csv'
            output_file_path = os.path.join(output_folder, output_file_name)
            result.coalesce(1).write.csv(output_file_path, header=True, mode='overwrite')
            print(f"Results saved to {output_file_path}")

    except Exception as e:
        print(f"Error processing file {zip_file_path}: {e}")

def process_single_zip_file(zip_file_name, source_folder, transfer_folder, output_folder):
    source_zip_path = os.path.join(source_folder, zip_file_name)
    local_zip_path = os.path.join(transfer_folder, zip_file_name)

    # Copy the ZIP file to the local transfer folder
    try:
        print(f"Copying {source_zip_path} to {local_zip_path}...")
        shutil.copy2(source_zip_path, local_zip_path)
        print("Copy completed.")
    except Exception as e:
        print(f"Error copying file {source_zip_path}: {e}")
        return  # Skip this file

    # Process the local ZIP file
    process_zip_file(local_zip_path, output_folder)

    # Delete the local ZIP file after processing
    try:
        os.remove(local_zip_path)
        print(f"Deleted local file {local_zip_path}.")
    except Exception as e:
        print(f"Error deleting file {local_zip_path}: {e}")

def main():
    # Define paths
    source_folder = r'X:Quote_2001.zip'  # Source folder containing ZIP files
    transfer_folder = r'X:\Transfer'  # Temporary transfer folder
    output_folder = r'C:\Users\lishe\Documents\GitHub\Replicate-Work\TAQ_Output\Quote_Daily'  # Output folder

    # Create necessary directories if they don't exist
    os.makedirs(transfer_folder, exist_ok=True)
    os.makedirs(output_folder, exist_ok=True)

    # Get a list of all ZIP files in the source folder
    zip_files = [f for f in os.listdir(source_folder) if f.endswith('.zip')]

    # Process the ZIP files sequentially (can be parallelized)
    for zip_file_name in zip_files:
        process_single_zip_file(zip_file_name, source_folder, transfer_folder, output_folder)

    # Stop the Spark session
    spark.stop()
    print("All files processed, program completed.")

if __name__ == "__main__":
    main()


NotADirectoryError: [WinError 267] 目录名称无效。: 'X:Quote_2001.zip'