# 数据库连接与操作


##  mysql-connector
官方提供的connector

### 创建数据库连接
常用的参数就是host, port, user, passwd, db,charset。 这里需要说明下charset这个参数，不能写utf-8，得写utf8（或者utf8mb4，根据具体情况，建议数据库的charset都设置成utf8mb4） 

:param str host:        host to connect
:param str user:        user to connect as
:param str password:    password to use
:param str passwd:      alias of password, for backward compatibility
:param str database:    database to use
:param str db:          alias of database, for backward compatibility
:param int port:        TCP/IP port to connect to
:param str unix_socket: location of unix_socket to use
:param dict conv:       conversion dictionary, see MySQLdb.converters
**:param int connect_timeout:**
    number of seconds to wait before the connection attempt fails.

:param bool compress:   if set, compression is enabled
:param str named_pipe:  if set, a named pipe is used to connect (Windows only)
:param str init_command:
    command which is run once the connection is created

:param str read_default_file:
    file from which default client values are read

:param str read_default_group:
    configuration group to use from the default file

**:param type cursorclass:**
    class object, used to create cursors (keyword only)

:param bool use_unicode:
    If True, text-like columns are returned as unicode objects
    using the connection's character set. Otherwise, text-like
    columns are returned as bytes. Unicode objects will always
    be encoded to the connection's character set regardless of
    this setting.
    Default to False on Python 2 and True on Python 3
    so that you can always get python `str` object by default.

:param str charset:
    If supplied, the connection character set will be changed
    to this character set.
    On Python 2, this option changes default value of `use_unicode`
    option from False to True.

**:param str auth_plugin:**
    If supplied, the connection default authentication plugin will be
    changed to this value. Example values:
    `mysql_native_password` or `caching_sha2_password`

:param str sql_mode:
    If supplied, the session SQL mode will be changed to this
    setting.
    For more details and legal values, see the MySQL documentation.

:param int client_flag:
    flags to use or 0 (see MySQL docs or constants/CLIENTS.py)

:param dict ssl:
    dictionary or mapping contains SSL connection parameters;
    see the MySQL documentation for more details
    (mysql_ssl_set()).  If this is set, and the client does not
    support SSL, NotSupportedError will be raised.

:param bool local_infile:
    enables LOAD LOCAL INFILE; zero disables

**:param bool autocommit:**
    If False (default), autocommit is disabled.
    If True, autocommit is enabled.
    If None, autocommit isn't set and server default is used.

:param bool binary_prefix:
    If set, the '_binary' prefix will be used for raw byte query
    arguments (e.g. Binary). This is disabled by default.

There are a number of undocumented, non-standard methods. See the
documentation for the MySQL C API for some hints on what they do.


In [4]:
# 创建数据库连接，建立t_activity表，后面需要用到这个表
# DDL和DML是什么？
# truncate 其实是DDL语句
# DML语句，在binlog里面，其实是会留下数据变更记录
# DDL语句，在binlog里面，就是语句本身
# truncate 在binlog里面，其实就是语句本身
import MySQLdb

host="localhost"
user="root"
passwd="rxy123456"
db_name="zst"
charset="utf8mb4"
if __name__ == '__main__':
    db = MySQLdb.connect(
        host=host,  # 主机名
        user=user,  # 用户名
        passwd=passwd,  # 密码
        db=db_name,
        charset=charset)  # 数据库名称
    
    cur = db.cursor()

    try:
        cur.execute("""CREATE TABLE IF NOT EXISTS `t_activity`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `activity_name` varchar(128) COLLATE utf8mb4_bin NOT NULL,
  `hold_time` datetime NOT NULL,
  `form` varchar(128) COLLATE utf8mb4_bin DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;""")
        print("Create table success")
        
        salt_list = []
        for i in range(10):
            salt = ''.join(random.sample(string.ascii_letters + string.digits, 8))
            salt_list.append((salt, salt))
        
        print(salt_list)

        #cur.executemany("INSERT INTO t_activity VALUES (null, %s, Now(), %s)", salt_list)
        cur.execute("INSERT INTO t_activity VALUES (null, %s, Now(), %s)", ("a", "b"))

        db.commit() # 一定需要commit()
    except Exception as e:
        logging.exception(e)
        db.rollback()
    finally:
        cur.close()
        db.close()

Create table success
[('fgNWFbPn', 'fgNWFbPn'), ('isRhpq01', 'isRhpq01'), ('2doMBj1Y', '2doMBj1Y'), ('nwPiLUqE', 'nwPiLUqE'), ('Stuc4d3i', 'Stuc4d3i'), ('HsF4WJ0f', 'HsF4WJ0f'), ('dmXBJnDa', 'dmXBJnDa'), ('dnO3erUs', 'dnO3erUs'), ('lt8ByuKL', 'lt8ByuKL'), ('t8pgFyc2', 't8pgFyc2')]


### 执行sql并获取内容

In [6]:
import string
import random
import MySQLdb
import logging

host="localhost"
user="root"
passwd="rxy123456"
db_name="zst"
charset="utf8mb4"

if __name__ == '__main__':
    db = MySQLdb.connect(
        host=host,  # 主机名
        user=user,  # 用户名
        passwd=passwd,  # 密码
        db=db_name,
        charset=charset)  # 数据库名称

    cur = db.cursor()

    try:
        cur.execute("select * from t_activity ")
        # 也可以使用fetchmany获取指定数量的行数
        # print(conn.fetchmany(10))
        if cur.rowcount == 0:
            print("No result found")
        else:
            for row in cur.fetchall():
                print(row)
    except Exception as e:
        logging.exception(e)
        db.rollback()
    finally:
        cur.close()
        db.close()

(3, 'XKlZUR1E', datetime.datetime(2020, 3, 14, 8, 48, 59), 'XKlZUR1E')
(14, '【dbaplus社群线上分享212期】基于业务模式差异化的运维规划之道', datetime.datetime(2020, 3, 18, 0, 0), '线上分享')
(15, '腾讯云国产数据库TDSQL技术揭秘', datetime.datetime(2020, 3, 17, 0, 0), '线上分享')
(16, 'CSIC2020第五届SaaS应用大会', datetime.datetime(2020, 5, 26, 0, 0), '线下峰会')
(26, 'abc', datetime.datetime(2020, 3, 14, 8, 44, 44), '线下')
(27, '02iR94la', datetime.datetime(2020, 3, 14, 9, 56, 52), '02iR94la')
(28, 'N2E9HAor', datetime.datetime(2020, 3, 14, 9, 56, 52), 'N2E9HAor')
(29, '0ROBswuX', datetime.datetime(2020, 3, 14, 9, 56, 52), '0ROBswuX')
(30, 'PlFniuV5', datetime.datetime(2020, 3, 14, 9, 56, 52), 'PlFniuV5')
(31, 'Rodi4AuO', datetime.datetime(2020, 3, 14, 9, 56, 52), 'Rodi4AuO')
(32, 'DuQVLKfW', datetime.datetime(2020, 3, 14, 9, 56, 52), 'DuQVLKfW')
(33, '8i5VSGsr', datetime.datetime(2020, 3, 14, 9, 56, 52), '8i5VSGsr')
(34, 'jrTS57DF', datetime.datetime(2020, 3, 14, 9, 56, 52), 'jrTS57DF')
(35, '2LVaANmD', datetime.datetime(2020, 3, 14, 9, 56, 5

In [None]:
# 利用fetchone一行行读取数据
import MySQLdb
import logging

if __name__ == '__main__':
    db = MySQLdb.connect(
        host=host,  # 主机名
        user=user,  # 用户名
        passwd=passwd,  # 密码
        db=db_name,
        charset=charset)  # 数据库名称

    conn = db.cursor()

    try:
        # 但实际上会把数据全部拿过来，看下fetchone的代码即可
        conn.execute("select * from t_activity")
        print(conn.fetchone())
        print(conn.fetchone())
        print(conn.rowcount)

    except Exception as e:
        logging.exception(e)
        db.rollback()
    finally:
        conn.close()
        db.close()

不同的cursor会有不同的获取数据的逻辑

<img src="mysql_db_cursors.png" />

In [7]:
import MySQLdb
import logging
from MySQLdb.cursors import  SSDictCursor

if __name__ == '__main__':
    db = MySQLdb.connect(
        host=host,  # 主机名
        user=user,  # 用户名
        passwd=passwd,  # 密码
        db=db_name,
        charset=charset)  # 数据库名称

    conn = db.cursor(SSDictCursor)

    try:
        conn.execute("select * from t_activity")
        print(conn.fetchone())

    except Exception as e:
        logging.exception(e)
        db.rollback()
    finally:
        conn.close()
        db.close()

{'id': 1, 'activity_name': 'a', 'hold_time': datetime.datetime(2020, 3, 11, 21, 12, 8), 'form': '线下'}


防止SQL注入

In [9]:
#SQL注入的例子
import MySQLdb
import logging

if __name__ == '__main__':
    db = MySQLdb.connect(
        host=host,  # 主机名
        user=user,  # 用户名
        passwd=passwd,  # 密码
        db=db_name,
        charset=charset)  # 数据库名称

    cur = db.cursor()
    try:
        print("select * from t_activity where form = '%s' " % "1' or '' = '")
        cur.execute("select * from t_activity where form = '%s' " % "1' or '' = '")
        print(len(cur.fetchall()))

    except Exception as e:
        logging.exception(e)
        db.rollback()
    finally:
        cur.close()
        db.close()

select * from t_activity where form = '1' or '' = '' 
36


In [11]:
# 无法注入SQL
import MySQLdb
import logging

if __name__ == '__main__':
    db = MySQLdb.connect(
        host=host,  # 主机名
        user=user,  # 用户名
        passwd=passwd,  # 密码
        db=db_name,
        charset=charset)  # 数据库名称

    cur = db.cursor()
    try:
        cur.execute("select * from t_activity where form = %s", "1")
        print(len(cur.fetchall()))

    except Exception as e:
        logging.exception(e)
        db.rollback()
    finally:
        cur.close()
        db.close()

0


python并不支持mysql预编译语句，其实“参数化”是在MySQLdb中通过转义字符串然后直接将它们插入到查询中而不是使用MYSQL_STMT API来完成的。因此，unicode字符串必须经过两个中间表示（编码字符串，转义编码字符串）才能被数据库接收。

也就是说，我们传入的参数并不是直接拼接到sql语句中，而是经过了字符转换，因此参数化查询可以有效防止sql注入

### 游标控制

In [None]:

import MySQLdb
import logging

host="localhost"
user="root"
passwd="rxy123456"
db_name="zst"
charset="utf8mb4"

if __name__ == '__main__':
    db = MySQLdb.connect(
        host=host,  # 主机名
        user=user,  # 用户名
        passwd=passwd,  # 密码
        db=db_name,
        charset=charset)  # 数据库名称

    cur = db.cursor()

    try:
        cur.execute("select * from t_activity")
        # 也可以使用fetchmany获取指定数量的行数
        # print(conn.fetchmany(10))
        if cur.rowcount == 0:
            print("No result found")
        else:
            cur.fetchall()
            # 类似于访问文件的peek
            cur.scroll(-1, mode='relative')
            print(cur.fetchone())
            cur.scroll(2, mode='absolute')
            print(cur.fetchone())
    except Exception as e:
        logging.exception(e)
        db.rollback()
    finally:
        cur.close()
        db.close()

### DML

- 获取DML影响的行数
- 获取插入数据的自增ID
- 使用参数化

In [8]:
# 执行普通的插入操作
import MySQLdb

if __name__ == '__main__':
    db = MySQLdb.connect(
        host=host,  # 主机名
        user=user,  # 用户名
        passwd=passwd,  # 密码
        db=db_name,
        charset=charset)  # 数据库名称

    cur = db.cursor()

    try:
        r = cur.execute("INSERT INTO t_activity VALUES (null, 'a', Now(), '线下')")
        # 获取插入行的自增ID
        print("lastrowid: " + str(cur.lastrowid))
        # 影响了几行
        print("update row: " + str(r))
        db.commit() # 一定需要commit()
    except Exception as e:
        db.rollback()
    finally:
        cur.close()
        db.close()

lastrowid: 1
update row: 1


In [None]:
import MySQLdb

if __name__ == '__main__':
    db = MySQLdb.connect(
        host=host,  # 主机名
        user=user,  # 用户名
        passwd=passwd,  # 密码
        db=db_name,
        charset=charset)  # 数据库名称

    # 可以设置autocommit，或者在connect函数中也可以设置
    db.autocommit(True)
    
    cur = db.cursor()

    try:
        r = cur.execute("INSERT INTO t_activity VALUES (null, 'a', Now(), '线下')")
        # 获取插入行的自增ID
        print("lastrowid: " + str(cur.lastrowid))
        # 影响了几行
        print("update row: " + str(r))
    except Exception as e:
        db.rollback()
    finally:
        cur.close()
        db.close()

In [None]:
if __name__ == '__main__':
    db = MySQLdb.connect(
        host=host,  # 主机名
        user=user,  # 用户名
        passwd=passwd,  # 密码
        db=db_name,
        charset=charset)  # 数据库名称

    conn = db.cursor()

    try:
        for i in range(10):
            salt = ''.join(random.sample(string.ascii_letters + string.digits, 8))
            conn.execute("INSERT INTO t_activity VALUES (null, %s, Now(), %s)", (salt, salt))
        db.commit() # 一定需要commit()
    except Exception as e:
        logging.exception(e)
        db.rollback()
    finally:
        conn.close()
        db.close()

In [None]:
import string
import random
import MySQLdb
import logging

if __name__ == '__main__':
    db = MySQLdb.connect(
        host=host,  # 主机名
        user=user,  # 用户名
        passwd=passwd,  # 密码
        db=db_name,
        charset=charset)  # 数据库名称

    conn = db.cursor()

    try:
        salt_list = []
        for i in range(10):
            salt = ''.join(random.sample(string.ascii_letters + string.digits, 8))
            salt_list.append((salt, salt))

        conn.executemany("INSERT INTO t_activity VALUES (null, %s, Now(), %s)", salt_list)

        db.commit() # 一定需要commit()
    except Exception as e:
        logging.exception(e)
        db.rollback()
    finally:
        conn.close()
        db.close()

## PyMySQL
PyMySQL 是一个纯 Python 实现的 MySQL 客户端操作库，支持事务、存储过程、批量执行等。 但接口和db是一样的，以前有段时间，mysqldb只有python2版本，而没有python3版本，所以需要PyMySQL进行兼容
为了兼容mysqldb，只需要加入pymysql.install_as_MySQLdb()

## SQLAlchemy
对象-关系映射（Object-Relational Mapping，简称ORM），面向对象的开发方法是当今企业级应用开发环境中的主流开发方法，关系数据库是企业级应用环境中永久存放数据的主流数据存储系统。对象和关系数据是业务实体的两种表现形式，业务实体在内存中表现为对象，在数据库中表现为关系数据。内存中的对象之间存在关联和继承关系，而在数据库中，关系数据无法直接表达多对多关联和继承关系。因此，对象-关系映射(ORM)系统一般以中间件的形式存在，主要实现程序对象到关系数据库数据的映射。  
简单的说，ORM会将数据库中的表映射成一个对象  
java ssh框架 hibernate jpa mybatis  orm框架

### 查询
下面都是一些查询的例子，

In [None]:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, BigInteger, DateTime
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import alias
from sqlalchemy import or_
from sqlalchemy import text


Base = declarative_base()


class Activity(Base):
    __tablename__ = 't_activity'

    id = Column(BigInteger, primary_key=True)
    name = Column('activity_name', String(128), nullable=False,)
    time = Column('hold_time', DateTime, nullable=False)
    form = Column('form',String(128))

    def __repr__(self):
        return "<Activity(id={},name={},time={},form={})>".format(self.id, self.name, self.time, self.form)


if __name__ == '__main__':
    engine = create_engine('mysql://root:rxy123456@127.0.0.1/zst?charset=utf8mb4')

    Session = sessionmaker(bind=engine)

    session = Session()
    # 使用filter_by进行查询
    #activity_list = session.query(Activity).filter_by(form='okG7dDRH', name='okG7dDRH').all()
    #activity_list = session.query(Activity).filter_by(Activity.form.isnot(None)).all()

    # 用filter函数

    # eq
    #activity_list = session.query(Activity).filter(Activity.id == 1).all()

    # like
    #activity_list = session.query(Activity).filter(Activity.name.like('%数据库%')).all()

    # in查询
    # activity_list = session.query(Activity).filter(Activity.id.in_([1,2]))

    # or
    # activity_list = session.query(Activity)\
    #    .filter(or_(Activity.name.like('%数据库%'), Activity.name.like('%TBase%'))).all()

    # 排序
    #activity_list = session.query(Activity).filter(Activity.id.in_([1, 2]))\
    #   .order_by(Activity.time.desc())

    # 返回结果
    # first()返回至多一个结果，而且以单项形式，而不是只有一个元素的tuple形式返回这个结果.
    #activity = session.query(Activity).filter(Activity.name.like('%数据库%')).first()

    # one()返回且仅返回一个查询结果。当结果的数量不足一个或者多于一个时会报错。
    #activity = session.query(Activity).filter(Activity.name.like('%数据库%')).one()

    # one_or_none()：从名称可以看出，当结果数量为0时返回None， 多于1个时报错
    # activity = session.query(Activity).filter(Activity.id == 1).one_or_none()
    # print(activity)

    # count
    count = session.query(Activity).filter(Activity.name.like('%数据库%')).count()
    print(count)

    # 使用嵌入式sql
    #activity_list = session.query(Activity).filter(text("id > 2")).all()

    # 使用参数
    #activity_list = session.query(Activity).filter(text("id > :value"))\
    #    .params(value=2).all()

    # 直接使用sql
    activity_list = session.query(Activity).from_statement(text("select * from t_activity"))
    for activity in activity_list:
        print(activity)

    session.commit()

### DML


In [None]:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, BigInteger, DateTime
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import alias
from sqlalchemy import or_
from sqlalchemy import text
from datetime import datetime

Base = declarative_base()


class Activity(Base):
    __tablename__ = 't_activity'

    id = Column(BigInteger, primary_key=True)
    name = Column('activity_name', String(128), nullable=False,)
    time = Column('hold_time', DateTime, nullable=False)
    form = Column('form',String(128))

    def __repr__(self):
        return "<Activity(id={},name={},time={},form={})>".format(self.id, self.name, self.time, self.form)


if __name__ == '__main__':
    engine = create_engine('mysql://root:rxy123456@127.0.0.1/zst?charset=utf8mb4')

    Session = sessionmaker(bind=engine)

    session = Session()
    # 插入
    a = Activity(name="abc", time=datetime.now(), form="线下")
    session.add(a)

    # 先查询，后删除
    session.query(Activity).filter(Activity.id == 2).delete()

    session.query(Activity).filter(Activity.id == 3).update({"time": datetime.now()})
    session.commit()

In [None]:
# 抓取程序
import logging
import requests
from bs4 import BeautifulSoup
from bs4 import Tag
import MySQLdb


# 将beautifulsoap的Tag对象，解析成一个词典dict对象
def parse_activity_li(li):
    if not isinstance(li, Tag):
        return None

    name_ele = li.select_one(".activ-text > .text > a")
    if name_ele is None:
        return None

    date_div = li.find(class_="date")
    if date_div is None:
        return None

    span_list = date_div.find_all('span')
    if span_list is None or len(span_list) != 2:
        return None

    form_text = span_list[1].get_text().strip()
    form_splits = form_text.partition("：")
    if len(form_splits) != 3:
        return None

    time_text = span_list[0].get_text().strip()
    time_splits = time_text.partition("：")
    if len(time_splits) != 3:
        return None

    return {
        "activity_name": name_ele.get_text().strip(),
        "hold_time": time_splits[2].strip(),
        "form": form_splits[2].strip()
    }


if __name__ == '__main__':
    print("【".encode('utf-8'))
    # 至少得把日志记录到文件
    logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s")

    host = "localhost"
    user = "root"
    passwd = "rxy123456"
    db_name = "zst"
    charset = "utf8mb4"

    db = MySQLdb.connect(
        host=host,  # 主机名
        user=user,  # 用户名
        passwd=passwd,  # 密码
        db=db_name,
        charset=charset)  # 数据库名称

    cur = db.cursor()

    resp = requests.get("https://dbaplus.cn/")
    if resp.status_code >= 300:
        logging.error("failed to make a request to dbaplus, status_code: %d", resp.status_code)
        exit(1)

    soup = BeautifulSoup(resp.text, 'html.parser')
    list_group = soup.select_one('.list-group')
    if list_group is None:
        logging.error("failed to get list-group")
        exit(1)

    count = 0
    for li in list_group.children:
        if li.name != "li":
            continue
        activity = parse_activity_li(li)
        if activity is None:
            continue

        print(type(activity))
        print(activity)
        logging.debug("activity name: %s", activity['activity_name'])
        cur.execute("select * from t_activity where activity_name = %s", tuple([activity["activity_name"]]))
        logging.debug("last execute: %s", cur._last_executed)
        logging.debug("Row_count1: %d", cur.rowcount)

        if cur.rowcount > 0:
            continue

        cur.execute("insert into t_activity values (null, %(activity_name)s, %(hold_time)s, %(form)s)", activity)
        count += 1

    db.commit()
    logging.info("Insert %d rows", count)




In [None]:
# 查询程序
import MySQLdb
import argparse
import logging
from datetime import date

if __name__ == '__main__':
    argument_parser = argparse.ArgumentParser(description="DBAPLUS活动查询")
    argument_parser.add_argument("-n", "--name", help="名称")
    group = argument_parser.add_mutually_exclusive_group()
    group.add_argument("-t", "--time", nargs=2, help="查询该时间之后的")
    group.add_argument("-a", "--all", help="是否返回", action="store_true")
    argument_parser.add_argument("-d", "--debug", help="debugg",action="store_true")
    args = argument_parser.parse_args()

    if args.debug is True:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)

    host = "localhost"
    user = "root"
    passwd = "rxy123456"
    db_name = "zst"
    charset = "utf8mb4"

    db = MySQLdb.connect(
        host=host,
        user=user,
        passwd=passwd,
        db=db_name,
        charset=charset)

    sql = "select * from t_activity"
    where_condition = []
    params = []
    # 如果时间参数为空的话， 默认要打印还没有举行的活动
    if args.time is None:
        # 如果all参数也为空的话
        if args.all is False:
            where_condition.append("hold_time >= Now()")
    else:
        # 用户指定了查询指定时间段的活动
        where_condition.append("hold_time >= %s")
        where_condition.append("hold_time <= %s")
        params.append(args.time[0])
        params.append(args.time[1])

    if args.name is not None:
        where_condition.append("activity_name like %s")
        params.append("%" + args.name + "%")

    if len(where_condition) > 0:
        sql += " where "
        for i in range(len(where_condition)):
            if i > 0:
                sql += " and "
            sql += where_condition[i]

    try:
        cur = db.cursor()
        logging.debug("Execute sql: %s", sql)
        logging.debug("params: %s", params)
        cur.execute(sql, tuple(params))
        if cur.rowcount == 0:
            print("No activity found")
        else:
            for row in cur.fetchall():
                print(row)
    except Exception as e:
        logging.exception(e)
        db.rollback()
    finally:
        db.close()
