In [1]:
import pandas as pd
import numpy as np
pd.set_option("display.show_dimensions", False)
pd.set_option("display.float_format", "{:4.2g}".format)

In [2]:
from IPython.core.magic import register_line_magic

@register_line_magic
def C(line):
    from IPython.core.getipython import get_ipython
    from fnmatch import fnmatch

    line = line.strip()
    if ' ' in line:
        idx_space = line.index(' ')
        space_num = line[:idx_space]
        if space_num.isdecimal():
            space_num = int(space_num)
            line = line[idx_space:]
        else:
            space_num = 5
    else:
        space_num = 5

    output_dict = {}
    cmds = line.split(';')
    for cmd in cmds:
        cmd = cmd.strip()
        if cmd != "":
            output_dict[cmd] = repr(eval(cmd)).split("\n")

    str_maxlen_in_cols = [max(len(cmd), len(max(data, key=len))) for cmd, data in output_dict.items()]
    data_row_max = max([len(v) for v in output_dict.values()])

    out_lines = [""]*(data_row_max+2)

    space=''
    for i, (cmd, data) in enumerate(output_dict.items()):
        w = str_maxlen_in_cols[i]

        out_lines[0]+=space+f'{cmd:^{w}}'
        out_lines[1]+=space+"-"*w
        for j, d in enumerate(data, 2):
            out_lines[j]+=space+f'{d:{w}}'

        if len(data) < data_row_max:
            for j in range(len(data)+2, data_row_max+2):
                out_lines[j]+=space+' '*w
        
        space = ' '*space_num

    for line in out_lines:
        print(line)

## 檔案的輸入輸出

表 5-3 輸入輸出函數
|函數名稱 |說明 |
|--------|-----|
|read_csv() |從 CSV 格式的文字檔讀取資料 |
|read_excel() |從 Excel 檔案讀取資料 |
|HDFStore() |使用 HDF5 檔案讀寫資料 |
|read_sql() |從 SQL 資料庫的查詢結果載入資料 |
|read_pickle() |讀取 Pickle 序列化之後的資料 |

### CSV檔案

`read_csv()` 從文字檔讀取資料，它的可選參數非常多，下面只簡介一些常用的參數：
- `sep` : 指定資料的分隔符號，可以使用正規表示法，預設值為逗號。有時 CSV 檔案為了便於閱讀，在逗號之後增加了一些空格以對齊每列的資料。如果希望忽略這些空格，可以將 `skipinitialspace` 參數設定為 `True`。
- 如果資料使用 空格 或 定位字符 分隔，可以不設定 `sep` 參數，而將 `delim_whitespace` 參數設定為 `0`。
- 預設情況下第一行文字被作為列索引標籤，如果資料檔案中沒有儲存列名稱的行，可以設定 `header` 參數為 `0`。
- 如果資料檔案之前包含一些說明行，可以使用 `skiprows` 參數指定資料開始的行號。
- `na_values`, `true_values`, `false_values` 等參數分別指定 `NaN`, `True`, `False` 對應的字串清單。
- 如果希望從字串讀取資料，可以使用 `io.BytesIO(string)` 將字串包裝成輸入流。
- 如果希望將字串轉為 時間，可以使用 `parse_dates` 指定轉為時間的列。
- 如果資料中包含中文，可以使用 `encoding` 參數指定檔案的編碼，例如 "utf-8", "gbk" 等。指定編碼之後獲得的字串列為 Unicode 字串。
- 可以使用 `usecols` 參數指定需要讀取的列。
- 當檔案很大時，可以用 `chunksize` 參數指定一次讀取的行數。當使用 `chunksize` 時，`read_csv()` 傳回一個反覆運算器。
- 當檔案名稱包含中文時，需要使用 Unicode 字串指定檔案名稱。

下面使用上面介紹的各個參數讀取上海市的空氣品質資料檔案。該檔案的文字編碼為 UTF-8，並且帶 BOM 。所謂 BOM ，是指在檔案開頭的 3 個特殊位元組表示該檔案為 UTF-8 檔案。對於帶 BOM 的 UTF-8 檔案，可以指定編碼參數 `encoding` 為 "utf-8-sig"。

該檔案中有兩種字元表示缺失資料：一個是減號，另一個是全形的橫杠。由於 `read_csv()` 在將位元組字串轉為 Unicode 之前判斷 `NaN` ，因此需要使用與檔案相同的編碼表示這些缺失資料的字串。

> **LINK**

> http://air.epmap.org/

> 空氣質量資料來源：青悅空氣質量歷史資料庫

In [3]:
df_list = []

for df in pd.read_csv(
        u"data/aqi/上海市_201406.csv", 
        encoding="utf-8-sig",  #檔案解碼
        chunksize=100,         #一次讀入的行數
        usecols=[u"時間", u"監測點", "AQI", "PM2.5", "PM10"], #只讀入這些列
        na_values=["-", "—"],  #這些字串表示缺失資料
        parse_dates=[0]):      #第一列為時間列
    df_list.append(df)  #在這裡處理資料

%C df_list[0].count(); df_list[0].dtypes

df_list[0].count()        df_list[0].dtypes   
------------------     -----------------------
時間       100           時間       datetime64[ns]
監測點       90           監測點              object
AQI      100           AQI               int64
PM2.5    100           PM2.5             int64
PM10      98           PM10            float64
dtype: int64           dtype: object          


注意：「時間」列為 `datetime64[ns]` 型態，而由於存在缺失資料，因此 "PM10" 列被轉為浮點數型態，其他的數值列為整數型態，而「監測點」列中儲存的是 Unicode 字串。

In [4]:
# print( type(df.loc[0, u"監測點"]) )
print( type(df[u"監測點"].iloc[0]) )

<class 'str'>


### HDF5檔案

> **LINK**

> http://www.nsmc.cma.gov.cn/FENGYUNCast/docs/HDF5.0_chinese.pdf

> 中文的HDF5使用簡介

HDF5 是儲存科學計算資料的一種檔案格式，支援大於 2GB 的檔案，可以把它看作針對科學計算的資料庫檔案。關於 HDF5 檔案格式的更多資訊，請參考上面的連結：(2022-01-19 連結己失效)

HDF5 檔案像一個儲存資料的檔案系統，其中只有兩種型態的物件：資料資料(dataset) 和 目錄(group)：

- 資料資料(dataset)：像檔案系統中的檔案一樣用於儲存各種資料，例如 numpy 陣列。
- 目錄(group)：類似檔案系統中的資料夾，可以包含其他的 目錄(group) 或 資料資料(dataset)。

使用 Pandas 可以很方便地將多個 `Series` 和 `DataFrame` 儲存進 HDF5 檔案。HDF5 檔案採用二進位格式儲存資料，可以對資料進行壓縮儲存，比文字檔案更節省空間，存取也更迅速。

下面建立一個 `HDFStore` 物件，透過 `complib` 參數指定使用 "blosc" 壓縮資料，透過 `complevel` 參數指定壓縮層級。

In [5]:
store = pd.HDFStore("a.hdf5", complib="blosc", complevel=9)

`HDFStore` 物件支援字典介面，例如使用 `[]` 存取元素, `get()` 和 `keys()` 等方法。

In [6]:
df1 = pd.DataFrame(np.random.rand(100000, 4), columns=list("ABCD"))
df2 = pd.DataFrame(np.random.randint(0, 10000, (10000, 3)), 
                   columns=["One", "Two", "Three"])
s1 = pd.Series(np.random.rand(1000))
store["dataframes/df1"] = df1
store["dataframes/df2"] = df2
store["series/s1"] = s1
print( store.keys() )
print( df1.equals(store["dataframes/df1"]) )

['/series/s1', '/dataframes/df1', '/dataframes/df2', '/dataframes/df_dynamic1']
True


`HDFStore` 採用 `pytables` 擴充庫存取 HDF5 檔案，其 `get_node()` 方法可以獲得 `pytables` 中定義的 `Node` 物件。使用該物件可以檢查檔案中的所有節點，關於 `Node` 物件的用法，請參考 `pytables` 的文件。

> **LINK**

> http://pytables.github.io/usersguide/libref/hierarchy_classes.html
>
> `pytables`官方文件

下面用 `get_node()` 獲得根節點，然後呼叫 `_f_walknodes()` 檢查其包含的所有節點。由結果可知，`HDFStore` 中的 `Series` 和 `DataFrame` 物件與 HDF5 的目錄對應，目錄中透過多個資料資料(dataset)儲存實際的資料。

In [7]:
root = store.get_node("//")
for node in root._f_walknodes():
    print( node )

/dataframes (Group) ''
/series (Group) ''
/dataframes/df1 (Group) ''
/dataframes/df2 (Group) ''
/dataframes/df_dynamic1 (Group) ''
/series/s1 (Group) ''
/series/s1/index (CArray(1000,)shuffle, blosc(9)) ''
/series/s1/values (CArray(1000,)shuffle, blosc(9)) ''
/dataframes/df1/axis0 (CArray(4,)shuffle, blosc(9)) ''
/dataframes/df1/axis1 (CArray(100000,)shuffle, blosc(9)) ''
/dataframes/df1/block0_items (CArray(4,)shuffle, blosc(9)) ''
/dataframes/df1/block0_values (CArray(100000, 4)shuffle, blosc(9)) ''
/dataframes/df2/axis0 (CArray(3,)shuffle, blosc(9)) ''
/dataframes/df2/axis1 (CArray(10000,)shuffle, blosc(9)) ''
/dataframes/df2/block0_items (CArray(3,)shuffle, blosc(9)) ''
/dataframes/df2/block0_values (CArray(10000, 3)shuffle, blosc(9)) ''
/dataframes/df_dynamic1/table (Table(100000,)shuffle, blosc(9)) ''


透過前面介紹的方法將 `DataFrame` 物件儲存進 `HDFStore` 之後，無法再為其追加資料。在資料獲取和匯入大量 CSV 檔案時，我們通常希望能不斷地往同一 `DataFrame` 中增加新的資料。可以使用 `append()` 方法實現該功能。

❶ `append` 參數為 `False` 表示將覆蓋己存在的資料，如果指定鍵值不存在，可省略該參數。

❷ 將 `df3` 追加到指定鍵，因此讀取該鍵將獲得一個長度為 100100 的 `DataFrame` 物件。

In [8]:
store.append('dataframes/df_dynamic1', df1, append=False) #❶
df3 = pd.DataFrame(np.random.rand(100, 4), columns=list("ABCD"))
store.append('dataframes/df_dynamic1', df3) #❷
store['dataframes/df_dynamic1'].shape

(100100, 4)

使用 `append()` 將建立 `pytables` 中支援索引的表格(Table)節點，預設使用 `DataFrame` 的 `index` 作為索引。透過 `select()` 可以對表格進行查詢以取得滿足查詢準則的行(row)。在下面的程式中，透過 `where` 參數指定查詢準則，`index` 表示 `DataFrame` 的標籤資料。該條件取得標籤在 97 到 102 之間的所有行。由於我們將兩個預設標籤的 `DataFrame` 增加進該表格，因此 98 和 99 各對應兩行資料。使用該方式讀取部分資料時，可以減少記憶體使用量和磁碟讀取量，加強資料的存取速度。

In [9]:
print( store.select('dataframes/df_dynamic1', where='index > 97 & index < 102') )

        A    B     C     D
98   0.65 0.93  0.74  0.76
99   0.52 0.85 0.062  0.22
100  0.44 0.11  0.93  0.24
101   0.9 0.51  0.23  0.35
98   0.21 0.49 0.069 0.077
99  0.045 0.52  0.74  0.99


如果希望對 `DataFrame` 的指定列進行索引，可以在用 `append()` 建立新的表格時，透過 `data_columns` 指定索引列，或將其設定為 `True` 以對所有列建立索引。

In [10]:
store.append('dataframes/df_dynamic1', df1, append=False, data_columns=["A", "B"])
print( store.select('dataframes/df_dynamic1', where='A > 0.99 & B < 0.01') )

         A       B    C     D
1342     1   0.003 0.28  0.79
6605     1  0.0078 0.77  0.65
13779 0.99  0.0041 0.17   0.4
14888    1  0.0039 0.11  0.93
42852 0.99  0.0043 0.51  0.14
54590 0.99  0.0031 0.92     1
73805    1  0.0078 0.35  0.65
79281 0.99 0.00061 0.83  0.61
82027 0.99  0.0075 0.83  0.15
95532 0.99   0.003 0.33 0.076


下面循環讀取 `data\aqi` 路徑之下的所有 CSV 檔案，並將資料寫入 HDF5 檔案中。在將多個文字檔的資料逐次寫入 HDF5 檔案時，需要注意以下幾點事項：

- HDF5 檔案不支援 Unicode 字串，因此需要對 Unicode 字串進行編碼，轉為位元組字串。在本例中直接從檔案讀取 UTF-8 編碼的字串，因此在讀取 CSV 檔案時無須指定 `encoding` 參數。❶ 但是由於檔案可能包含 UTF-8 的 BOM，因此需要先讀取檔案的頭三個位元組並與 BOM 比較，這樣才能保障讀取的資料中與第一列對應的標籤不包含 BOM。
- 由於可能存在缺失資料，因此讀取的數值列的型態可能為整數和浮點數。由於 HDF5 檔案中的每列資料只能對應一種型態。❷ 因此需要使用 `dtype` 參數指定這些數值列的型態為浮點數。
- ❸ 需要為 HDF5 檔案中的字串列指定最大長度，否則該最大長度將由第一個被增加進 HDF5 檔案的資料的物件決定。

> **WARNING**

> 由於所有從CSV檔案讀入`DataFrame`物件的行索引都為預設值，因此HDF5檔案中的資料的行索引並不是唯一的。

In [11]:
def read_aqi_files(fn_pattern):
    from glob import glob
    from os import path
    
    UTF8_BOM = b"\xEF\xBB\xBF"
    
    cols = "時間,城市,監測點,質量等級,AQI,PM2.5,PM10,CO,NO2,O3,SO2".split(",")
    float_dtypes = {col:float for col in "AQI,PM2.5,PM10,CO,NO2,O3,SO2".split(",")}
    names_map = {"時間":"Time", 
                 "監測點":"Position", 
                 "質量等級":"Level", 
                 "城市":"City", 
                 "PM2.5":"PM2_5"}
    
    for fn in glob(fn_pattern):
        with open(fn, "rb") as f:
            sig = f.read(3) #❶
            if sig != UTF8_BOM:
                f.seek(0, 0)
            df = pd.read_csv(f, 
                             parse_dates=[0], 
                             na_values=["-", "—"], 
                             usecols=cols, 
                             dtype=float_dtypes) #❷
        # df.rename_axis(names_map, axis=1, inplace=True) 
        df.rename(columns=names_map, inplace=True) 
        df.dropna(inplace=True)
        yield df

store = pd.HDFStore("data/aqi/aqi.hdf5", complib="blosc", complevel=9)
string_size = {"City": 12, "Position": 30, "Level":12}

for idx, df in enumerate(read_aqi_files(u"data/aqi/*.csv")):
    store.append('aqi', df, append=idx!=0, min_itemsize=string_size, data_columns=True) #❸
    
store.close()

下面開啟 aqi.hdf5 檔案並讀取所有資料：

In [12]:
store = pd.HDFStore("data/aqi/aqi.hdf5")
df_aqi = store.select("aqi")
print( len(df_aqi) )

337250


下面唯讀取 PM2.5 值大於 500 的行：

In [13]:
df_polluted = store.select("aqi", where="PM2_5 > 500")
print( len(df_polluted) )

87


### 讀寫資料庫

用 `to_sql()` 可以將資料寫入 SQL 資料庫，它的第一個參數為資料庫的表名，第二個參數為表示與資料庫連接的 `Engine` 物件，`Engine` 在 `sqlalchemy` 函數庫中定義。下面首先從 `sqlalchemy` 中載入 `create_engine()` ，並呼叫它使用 SQLite 開啟資料庫檔案 "data/aqi/aqi.db" 。當該檔案不存在時，將建立新的資料庫檔案：

In [14]:
from sqlalchemy import create_engine
engine = create_engine('sqlite:///data/aqi/aqi.db')

為了避免重複寫入，下面先透過 `engine` 物件執行 SQL 敘述，刪除 aqi 表：

In [15]:
try:
    engine.execute("DROP TABLE aqi")
except:
    pass

然後呼叫 `to_sql()` 將資料寫入資料庫，`if_exists` 參數為 `"append"` 表示當表存在時，將新資料增加到表中。由於本例中 `DataFrame` 物件的行索引無實際意義，因此設定 `index` 參數為 `False`，表示不儲存行索引。由於資料庫要求使用 Unicode 字串，因此在寫入資料庫之前對字串進行解碼，將其資料轉為 Unicode 字串。如果在從 CSV 檔案讀取資料時，透過 `encoding` 參數指定了文字編碼，則不必執行此步驟。

In [16]:
str_cols = ["Position", "City", "Level"]

for df in read_aqi_files("data/aqi/*.csv"):
    for col in str_cols:
        df[col] = df[col].str.decode("utf8")
    df.to_sql("aqi", engine, if_exists="append", index=False)

下面呼叫 `read_sql()` 從資料庫讀取整數個名為 aqi 的表:

In [17]:
df_aqi = pd.read_sql("aqi", engine)

也可以透過 SQL 查詢敘述讀取部分資料，下面只讀取 PM2.5 值大於 500 的行：

In [18]:
df_polluted = pd.read_sql("select * from aqi where PM2_5 > 500", engine)
print( len(df_polluted) )

87


### 使用Pickle序列化

還可以使用 `to_pickle()` 和 `read_pickle()` 對 `DataFrame` 物件進行序列化和反序列化：

In [19]:
df_aqi.to_pickle("data/aqi/aqi.pickle")
df_aqi2 = pd.read_pickle("data/aqi/aqi.pickle")
df_aqi.equals(df_aqi2)

True

Pickle 是 python 特有的物件序列化格式，因此很難使用其他軟體、程式語言讀取 Pickle 化之後的資料，但是作為臨時儲存運算的中間結果還是很方便的。