# Python访问数据库

小型应用可以将数据存储在文件中，但是对于大中型应用则需要成熟的数据库系统的支持。在深入了解数据库系统前，我们先了解一下网络服务。

## 一、网络服务
网络服务一般基于分布式技术，通过网络提供服务的应用程序。网络服务一般采用客户端服务器（Client/Server）架构结构或浏览器服务器（Browser/Server）架构。在使用网络服务时，首先启动客户端，然后通过相关命令告知服务器进行连接以完成各种操作，而服务器则按照此请示提供相应的服务。每一个客户端软件的实例都可以向一个服务器或应用程序服务器发出请求。

- 数据库服务（TCP）
- Web服务（HTTP/HTTPS）
- 文件服务（FTP/SFTP）
- 邮件服务（SMTP/POP3）
- ......

![](images/cs.png)

目前主流的数据库按架构可分为：
- 基于文件系统的数据库，如：SQLite、Microsoft Access
- 基于客户端服务器架构的数据库，如：MySQL、Oracle、SQL Server、MongoDB、Redis

按模型分为：
- 关系型数据库，如：SQLite、MySQL、Oracle、SQL Server
- 非关系型数据库，如：MongoDB、Redis

接下里我们来讨论Python对这些数据库的访问能力，以获取和处理数据库中的数据。

## 二、Python访问数据库
### 1. SQL
SQL是访问关系型数据的的标准语言，可以使用SQL语句操作诸如SQLite、MySQL、Oracle、SQL Server等关系型数据库。所有关系型数据库都是库表结构，实现对数据增删改查基本SQL语句如下：

```mysql
insert into table_name (column1, column2, column3, ...) values (value1, value2, value3, ...)

delete from table_name where some_column=some_value;

update table_name set column1=value1, column2=value2, ... where some_column=some_value;

select column_name, column_name from table_name where some_column=some_value order by column_name asc|desc;
```

实现对库表结构的增删改基本SQL语句如下（注意不同关系型数据库产品可能会略有不同）：

```mysql
create database dbname;
drop database dbname;
```

```mysql
create table table_name 
(
column_name1 data_type(size),
column_name2 data_type(size),
....
);
alter table table_name add column column_name datatype
alter table table_name drop column column_name
alter table table_name alter column column_name datatype
drop table table_name;
```

### 2. 访问SQLite
SQLite是一个非常轻量级的关系型数据库，广泛应用于移动应用开发中。SQLite没有服务器端，通过API即可完成SQL语句操作，SQLite将数据存储在一个本地文件中，甚至存储在内存中。Python标准库中带有对SQLite访问的API，该API遵循Python DB-API规范。

![](images/sqlite.png)

In [1]:
import sqlite3
# conn = sqlite3.connect(':memory:') # 数据存储在内存中
conn = sqlite3.connect('data/ch07/sqlite.db') # 数据存储在文件中
conn.execute("drop table if exists quotation")
sql="""
create table quotation 
(
id integer primary key autoincrement,
stock_code varchar(20),
trade_date int,
open_price real,
close_price real
)
"""
conn.execute(sql)
conn.commit()

In [2]:
sql = "insert into quotation (stock_code, trade_date, open_price, close_price) \
values ('600864', 20210104, 8.59, 8.34)"
conn.execute(sql)
conn.commit()
data = [('600584', 20210104, 42.50, 42.53),
        ('600864', 20210105, 8.40, 8.51),
        ('600584', 20210105, 42.25, 42.29)]
sql = "insert into quotation (stock_code, trade_date, open_price, close_price) \
values (? ,?, ?, ?)"
conn.executemany(sql, data)
conn.commit()

In [3]:
cursor = conn.execute("select * from quotation")
rows = cursor.fetchall()
rows

[(1, '600864', 20210104, 8.59, 8.34),
 (2, '600584', 20210104, 42.5, 42.53),
 (3, '600864', 20210105, 8.4, 8.51),
 (4, '600584', 20210105, 42.25, 42.29)]

In [4]:
cursor.description # 游标description属性包含列名

(('id', None, None, None, None, None, None),
 ('stock_code', None, None, None, None, None, None),
 ('trade_date', None, None, None, None, None, None),
 ('open_price', None, None, None, None, None, None),
 ('close_price', None, None, None, None, None, None))

In [5]:
import pandas as pd
# 数据直接传入DataFrame构造器，注意Python 3中zip函数返回值发生变化
pd.DataFrame(rows, columns=list(zip(*cursor.description))[0]) 

Unnamed: 0,id,stock_code,trade_date,open_price,close_price
0,1,600864,20210104,8.59,8.34
1,2,600584,20210104,42.5,42.53
2,3,600864,20210105,8.4,8.51
3,4,600584,20210105,42.25,42.29


In [6]:
cursor.close()
conn.close()

#### 编程实践：尝试连接SQLite数据库并操作

### 3. 访问MySQL
![](images/mysql.png)

In [7]:
import pymysql

In [8]:
conn = pymysql.connect(host='localhost', port=3306, user='root', password='root')
cursor = conn.cursor()

In [9]:
cursor.execute('drop database if exists test')
cursor.execute('create database test')
conn.select_db('test')
sql="""
create table quotation 
(
id int(10) primary key auto_increment,
stock_code varchar(20),
trade_date int(10),
open_price float,
close_price float
)
"""
cursor.execute(sql)

0

In [10]:
sql = "insert into quotation (stock_code, trade_date, open_price, close_price) \
values ('600864', 20210104, 8.59, 8.34)"
cursor.execute(sql)
conn.commit()

sql = "insert into quotation (stock_code, trade_date, open_price, close_price) \
values (%s ,%s, %s, %s)"
data = [('600584', 20210104, 42.50, 42.53),
        ('600864', 20210105, 8.40, 8.51),
        ('600584', 20210105, 42.25, 42.29)]
cursor.executemany(sql, data)
conn.commit()

In [11]:
count = cursor.execute("select * from quotation")
count

4

In [12]:
rows = cursor.fetchall()
rows # 返回的是元组

((1, '600864', 20210104, 8.59, 8.34),
 (2, '600584', 20210104, 42.5, 42.53),
 (3, '600864', 20210105, 8.4, 8.51),
 (4, '600584', 20210105, 42.25, 42.29))

In [13]:
import pandas as pd
pd.DataFrame(list(rows), columns=list(zip(*cursor.description))[0])

Unnamed: 0,id,stock_code,trade_date,open_price,close_price
0,1,600864,20210104,8.59,8.34
1,2,600584,20210104,42.5,42.53
2,3,600864,20210105,8.4,8.51
3,4,600584,20210105,42.25,42.29


In [14]:
cursor.close()
conn.close()

#### 编程实践：尝试连接mysql数据库并操作

### 4. 访问mongodb
![](images/mongodb.png)

mongodb与前面的的关系型数据库不同，是所谓NoSQL数据库中的一员。本质上mongodb是一个文档数据库，其内部没有库表结构，取而代之是库和集合，集合内不一定存储相同结构的数据。

![](images/mongodb-collection.png)

mongodb增删改查基本语法如下：

![](images/mongodb-insert.png)
![](images/mongodb-remove.png)
![](images/mongodb-update.png)
![](images/mongodb-find.png)

In [15]:
import pymongo
# client = pymongo.MongoClient('127.0.0.1', 27017)
# client.admin.authenticate('root', 'root', source='admin')

一般使用数据库连接字符串的方式：

In [16]:
client = pymongo.MongoClient('mongodb://root:root@127.0.0.1/admin')
db = client.test # 指定库
db.quotation.drop()
db.quotation.insert_one({'stock_code': '600864', 'trade_date': 20210104, 'open_price': 8.59, 'close_price': 8.34})
data = [{'stock_code': '600584', 'trade_date': 20210104, 'open_price': 42.50, 'close_price': 42.53}, 
        {'stock_code': '600864', 'trade_date': 20210105, 'open_price': 8.40, 'close_price': 8.51},
        {'stock_code': '600584', 'trade_date': 20210105, 'open_price': 42.25, 'close_price': 42.29}]
db.quotation.insert_many(data)

<pymongo.results.InsertManyResult at 0x7f91976fdec8>

In [17]:
db.list_collection_names()

['quotation']

In [18]:
results = db.quotation.find()
list(results)

[{'_id': ObjectId('60ab1d3606dfc1a2da28106f'),
  'stock_code': '600864',
  'trade_date': 20210104,
  'open_price': 8.59,
  'close_price': 8.34},
 {'_id': ObjectId('60ab1d3606dfc1a2da281070'),
  'stock_code': '600584',
  'trade_date': 20210104,
  'open_price': 42.5,
  'close_price': 42.53},
 {'_id': ObjectId('60ab1d3606dfc1a2da281071'),
  'stock_code': '600864',
  'trade_date': 20210105,
  'open_price': 8.4,
  'close_price': 8.51},
 {'_id': ObjectId('60ab1d3606dfc1a2da281072'),
  'stock_code': '600584',
  'trade_date': 20210105,
  'open_price': 42.25,
  'close_price': 42.29}]

In [19]:
result = db.quotation.find_one({'stock_code': '600864'})
result

{'_id': ObjectId('60ab1d3606dfc1a2da28106f'),
 'stock_code': '600864',
 'trade_date': 20210104,
 'open_price': 8.59,
 'close_price': 8.34}

In [20]:
db.quotation.update_one({'stock_code': '600864'}, {'$set': {'close_price': 100} })
result = db.quotation.find_one({'stock_code': '600864'})
result

{'_id': ObjectId('60ab1d3606dfc1a2da28106f'),
 'stock_code': '600864',
 'trade_date': 20210104,
 'open_price': 8.59,
 'close_price': 100}

In [21]:
db.quotation.update_one({'stock_code': '000001'}, {'$setOnInsert' :{'stock_code': '000001', 'trade_date': 20210104, 'open_price': 10, 'close_price': 10}}, upsert = True )
result = db.quotation.find_one({'stock_code': '000001'})
result

{'_id': ObjectId('60ab1d36895c3c69dbbcf52e'),
 'stock_code': '000001',
 'close_price': 10,
 'open_price': 10,
 'trade_date': 20210104}

In [22]:
db.quotation.delete_one({'stock_code': '000001'})
db.quotation.update_one({'stock_code': '600000'}, {'$setOnInsert' :{'stock_code': '600000', 'trade_date': 20210104}}, upsert = True )
results = db.quotation.find() # MongoDB中collection中的文档结构不一定要一样
list(results)

[{'_id': ObjectId('60ab1d3606dfc1a2da28106f'),
  'stock_code': '600864',
  'trade_date': 20210104,
  'open_price': 8.59,
  'close_price': 100},
 {'_id': ObjectId('60ab1d3606dfc1a2da281070'),
  'stock_code': '600584',
  'trade_date': 20210104,
  'open_price': 42.5,
  'close_price': 42.53},
 {'_id': ObjectId('60ab1d3606dfc1a2da281071'),
  'stock_code': '600864',
  'trade_date': 20210105,
  'open_price': 8.4,
  'close_price': 8.51},
 {'_id': ObjectId('60ab1d3606dfc1a2da281072'),
  'stock_code': '600584',
  'trade_date': 20210105,
  'open_price': 42.25,
  'close_price': 42.29},
 {'_id': ObjectId('60ab1d37895c3c69dbbcf532'),
  'stock_code': '600000',
  'trade_date': 20210104}]

## 三、 代码阅读：封装数据库操作类

In [23]:
import pymysql
import logging
import pandas as pd

logger = logging.getLogger(__name__)


class MySqlHelper:
    """
    MySql帮助类
    """

    def __init__(self, **config):
        self.username = config['username'] if 'username' in config else 'root'
        self.password = config['password'] if 'password' in config else 'root'
        self.database = config['database'] if 'database' in config else 'db'
        self.host = config['host'] if 'host' in config else 'localhost'
        self.port = config['port'] if 'port' in config else 3306
        self.charset = config['charset'] if 'charset' in config else "utf8"
        self.connection = None
        self.connect()

    @staticmethod
    def escape(string):
        return '`%s`' % string

    def connect(self):
        """
        建立连接
        """
        if self.connection:
            self.connection.close()
        try:
            self.connection = pymysql.connect(
                user=self.username,
                password=self.password,
                host=self.host,
                port=self.port,
                database=self.database,
                charset='utf8mb4')
            return True
        except Exception:
            logger.error('An error occurred', exc_info=True)
            return False

    def disconnect(self):
        """
        断开连接
        """
        if self.connection:
            self.connection.close()
            self.connection = None

    def replace(self, table='', close=False, record=None):
        """
        根据主键或唯一索引更新记录或新增记录，注意replace方法将删除具有相同主键或唯一索引值的记录，然后插入新记录，注意其使用场景
        :param str table: 表名
        :param bool close: 结束后断开连接
        :param dict record: 更新或新增的记录
        :return bool: 操作成功与否
        """
        if record is None:
            record = {}
        if self.connection is None or not self.connection.open:
            self.connect()

        table = self.escape(table)

        cur = None
        try:
            cur = self.connection.cursor()
            if record:
                keys, values = zip(*record.items())
                sql = 'replace into %s (%s) values (%s)' % (table,
                                                            ', '.join([self.escape(key) for key in keys]),
                                                            ', '.join(['%s', ] * len(values)))
                cur.execute(sql, self._to_none(values))
                self.connection.commit()
            return True
        except Exception:
            logger.error('An error occurred', exc_info=True)
            return False
        finally:
            if cur:
                cur.close()
            if close:
                self.disconnect()

    def replace_many(self, table='', close=False, records=None):
        """
        根据主键更新记录或新增多条记录
        :param str table: 表名
        :param bool close: 结束后断开连接
        :param list records: 更新或新增的记录集合，item是dict
        :return bool: 操作成功与否
        """
        if records is None:
            records = []
        if self.connection is None or not self.connection.open:
            self.connect()

        table = self.escape(table)
        cur = None
        try:
            cur = self.connection.cursor()
            if records and len(records) > 0:
                keys, values = zip(*records[0].items())
                sql = 'replace into %s (%s) values (%s)' % (table,
                                                            ', '.join(self.escape(k) for k in keys),
                                                            ', '.join(['%s', ] * len(values)))
                arr = []
                for result in records:
                    arr.append(self._to_none(result.values()))
                cur.executemany(sql, arr)
                self.connection.commit()
            return True
        except Exception:
            logger.error('An error occurred', exc_info=True)
            return False
        finally:
            if cur:
                cur.close()
            if close:
                self.disconnect()

    def update(self, table='', where='', close=False, record=None):
        """
        更新一个表中的一条记录
        :param str table: 表名
        :param str where: sql条件语句
        :param bool close: 结束后断开连接
        :param dict record: 更新的记录
        :return bool: 操作成功与否
        """
        if record is None:
            record = {}
        if self.connection is None or not self.connection.open:
            self.connect()
        table = self.escape(table)
        cur = None
        try:
            cur = self.connection.cursor()
            if record:
                pairs = ', '.join(self.escape(k) + '=%s' for k in record)
                w = (' where ' + where) if where else ''
                sql = 'update %s set %s %s' % (table, pairs, w)
                cur.execute(sql, self._to_none(record.values()))
                self.connection.commit()
            return True
        except Exception:
            logger.error('An error occurred', exc_info=True)
            return False
        finally:
            if cur:
                cur.close()
            if close:
                self.disconnect()

    def insert(self, table='', close=False, record=None):
        """
        根据条件插入数据
        :param str table: 表名
        :param bool close: 是否结束时关闭连接
        :param dict record: 待插入数据
        :return bool: 操作成功与否
        """

        if record is None:
            record = {}
        if self.connection is None or not self.connection.open:
            self.connect()

        table = self.escape(table)
        cur = None
        try:
            cur = self.connection.cursor()
            if record:
                keys, values = zip(*record.items())
                sql = 'insert into %s (%s) values (%s);' % (table,
                                                            ', '.join([self.escape(k) for k in keys]),
                                                            ', '.join(['%s', ] * len(values)))
                cur.execute(sql, self._to_none(values))
                self.connection.commit()
            return True
        except Exception:
            logger.error('An error occurred', exc_info=True)
            return False
        finally:
            if cur:
                cur.close()
            if close:
                self.disconnect()

    def exist_table(self, table=''):
        """
        检查表是否存在
        :param str table: 表名
        :return bool: 操作成功与否
        """
        tables = self.sql('show tables')
        for t in tables:
            if table in t.values():
                return True
        return False

    def exec_procedure(self, name=None, params=None):
        """
        执行存储过程
        :param str name: 存储过程名
        :param list params: 存储过程参数
        :return bool: 操作成功与否
        """

        if self.connection is None or not self.connection.open:
            self.connect()

        cur = None
        try:
            cur = self.connection.cursor()
            cur.callproc(name, params)
            return True
        except Exception:
            logger.error('An error occurred', exc_info=True)
            return False
        finally:
            if cur:
                cur.close()

    def select(self, fields='*', table='', page_size=100, page_number=1, where='', order_by='', close=False):
        """
        查询一个表中的记录
        :param str fields: 查询的字段
        :param str table: 表名
        :param int page_size: 获取记录数，等于0不分页
        :param int page_number: 页码，大于0的数字
        :param str where: sql条件语句
        :param str order_by: sql排序语句
        :param bool close: 结束后断开连接
        :return list: 结果记录集合，item是dict
        """
        if self.connection is None or not self.connection.open:
            self.connect()

        table = self.escape(table)
        cur = None
        try:
            cur = self.connection.cursor(pymysql.cursors.DictCursor)
            w = (' where ' + where) if where else ''
            o = (' order by ' + order_by) if order_by else ''
            offset = (page_number - 1) * page_size
            p = (' limit %d, %d' % (offset, page_size)) if page_size else ''
            sql = 'select %s from %s%s%s%s' % (fields, table, w, o, p)
            cur.execute(sql)
            return cur.fetchall()
        except Exception:
            logger.error('An error occurred', exc_info=True)
            return []
        finally:
            if cur:
                cur.close()
            if close:
                self.disconnect()

    def select_tables(self, fields='*', table=None, page_size=100, page_number=1, where=None, order_by='', close=False):
        """
        查询多个表中的记录
        :param str fields: 查询的字段
        :param str table: 表名列表
        :param int page_size: 获取记录数，等于0不分页
        :param int page_number: 页码，大于0的数字
        :param str where: sql条件语句
        :param str order_by: sql排序语句
        :param bool close: 结束后断开连接
        :return list: 结果记录集合，item是dict
        """
        if where is None:
            where = []
        if table is None:
            table = []
        if self.connection is None or not self.connection.open:
            self.connect()

        for x in range(len(table)):
            table[x] = self.escape(table[x])
        cur = None
        try:
            cur = self.connection.cursor(pymysql.cursors.DictCursor)
            w = [(' where ' + x) for x in where if x] if [(' where ' + x) for x in where if x] else []
            w.extend([''] * (len(table) - len(w)))
            o = (' order by ' + order_by) if order_by else ''
            offset = (page_number - 1) * page_size
            p = (' limit %d,%d' % (offset, page_size)) if page_size else ''
            sql_union = ''
            for i in range(len(table)):
                sql = 'select %s from %s%s' % (fields, table[i], w[i])
                if i < len(table) - 1:
                    sql_union += sql + ' union '
                else:
                    sql_union += sql + '%s%s' % (o, p)
            cur.execute(sql_union)
            return cur.fetchall()
        except Exception:
            logger.error('An error occurred', exc_info=True)
            return []
        finally:
            if cur:
                cur.close()
            if close:
                self.disconnect()

    def select_one(self, table='', where='', close=False):
        """
        查询一个表中的一条记录
        :param table: 表名
        :param where: sql条件语句
        :param close: 结束后断开连接
        :return dict: 结果记录
        """
        if self.connection is None or not self.connection.open:
            self.connect()

        table = self.escape(table)
        cur = None
        try:
            cur = self.connection.cursor(pymysql.cursors.DictCursor)
            w = (' where ' + where) if where else ''
            sql = 'select * from %s%s' % (table, w)
            cur.execute(sql)
            return cur.fetchone()
        except Exception:
            logger.error('An error occurred', exc_info=True)
            return {}
        finally:
            if cur:
                cur.close()
            if close:
                self.disconnect()

    def sql(self, sql, close=False):
        """
        执行sql语句
        :param str sql: sql完整语句
        :param bool close: 结束后断开连接
        :return list: 结果记录集合，item是dict
        """
        if self.connection is None or not self.connection.open:
            self.connect()

        cur = None
        try:
            cur = self.connection.cursor(pymysql.cursors.DictCursor)
            cur.execute(sql)
            self.connection.commit()
            try:
                return cur.fetchall()
            except Exception:
                return None
        except Exception:
            logger.error('An error occurred', exc_info=True)
            return []
        finally:
            if cur:
                cur.close()
            if close:
                self.disconnect()

    def count(self, table='', where='', close=False):
        """
        查询一个表中的记录数
        :param str table: 表名
        :param str where: sql条件语句
        :param bool close: 结束后断开连接
        :return int: 记录数
        """
        if self.connection is None or not self.connection.open:
            self.connect()

        table = self.escape(table)
        cur = None
        try:
            cur = self.connection.cursor(pymysql.cursors.DictCursor)
            w = (' where ' + where) if where else ''
            sql = 'select count(*) as count from %s %s' % (table, w)
            cur.execute(sql)
            return cur.fetchone()
        except Exception:
            logger.error('An error occurred', exc_info=True)
            return 0
        finally:
            if cur:
                cur.close()
            if close:
                self.disconnect()

    @staticmethod
    def _to_none(values):
        """
        替换nan等特殊空值
        """
        return [None if pd.isna(value) else value for value in values]

In [24]:
helper = MySqlHelper(username='root', password='root', host='127.0.0.1', port=3306, database='test')
print(helper.exist_table('quotation'))
helper.sql('update quotation set close_price=100 where id=1')
r = helper.sql('select * from quotation')
print(r)
q = helper.select_one(table='quotation', where='id=1')
print(q)
q['close_price'] = 200
helper.replace(table='quotation', record=q)
q = helper.select(table='quotation', where='id=1')
print(q)

True
[{'id': 1, 'stock_code': '600864', 'trade_date': 20210104, 'open_price': 8.59, 'close_price': 100.0}, {'id': 2, 'stock_code': '600584', 'trade_date': 20210104, 'open_price': 42.5, 'close_price': 42.53}, {'id': 3, 'stock_code': '600864', 'trade_date': 20210105, 'open_price': 8.4, 'close_price': 8.51}, {'id': 4, 'stock_code': '600584', 'trade_date': 20210105, 'open_price': 42.25, 'close_price': 42.29}]
{'id': 1, 'stock_code': '600864', 'trade_date': 20210104, 'open_price': 8.59, 'close_price': 100.0}
[{'id': 1, 'stock_code': '600864', 'trade_date': 20210104, 'open_price': 8.59, 'close_price': 200.0}]
