Skip to content

Commit

Permalink
Merge pull request #2 from noppoMan/lazy-result-set
Browse files Browse the repository at this point in the history
API design changes for 0.2.x
  • Loading branch information
noppoMan committed Nov 6, 2017
2 parents 49bfa9f + 5045045 commit 031a3eb
Show file tree
Hide file tree
Showing 21 changed files with 817 additions and 283 deletions.
29 changes: 23 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ A pure Swift Client implementing the MySql protocol. this is not depend on libmy
* [x] Prepared statements
* [x] Transactions
* [x] JSON Data Type in Mysql 5.7
* [x] Streaming query rows
* [ ] NonBlocking I/O

## Instllation
Expand All @@ -28,8 +29,8 @@ let url = URL(string: "mysql://localhost:3306")
let con = try Connection(url: url!, user: "root", password: "password", database: "swift_mysql")

let result = try con.query("selct * from users")
if let rs = result.asResultSet() {
for row in rs {
if let rows = result.asRows() {
for row in rows {
print(row) // ["id": 1, "name": "Luke", "email": "test@example.com"]
}
}
Expand All @@ -44,7 +45,7 @@ let url = URL(string: "mysql://localhost:3306")
let con = try Connection(url: url!, user: "root", password: "password", database: "swift_mysql")

let result = try con.query("selct * from books where published_at > ? and category_id = ?", [2017, 3])
if let rs = result.asResultSet() {
if let rows = result.asRows() {
for row in rs {
print(row)
}
Expand Down Expand Up @@ -99,22 +100,38 @@ try con.transaction {

### Rollback

## Streaming query rows

Sometimes you may want to select large quantities of rows and process each of them as they are received. This can be done like this

```swift
let result = try con.query("selct * from large_tables")
if let resultFetcher = result.asResultSet() {
print(resultFetcher.columns) // [String]

// resultFetcher.rows is a infinity Sequence
// you can loop it until the Mysql sends EOF packet.
for row in resultFetcher.rows {
print(row)
}
}
```

if the error is thrown in transaction block, `rollback` should be performed.

```swift
try con.transaction {
throw FooError
throw FooError
}
```

## Terminating connections

once call `close` method, the TCP connection is terminated safely.
once call `close` method, the Mysql connection is terminated safely.

```swift
try con.close()
```


## License
SwiftMysql is released under the MIT license. See LICENSE for details.
36 changes: 3 additions & 33 deletions Sources/SwiftMysql/Connection/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public final class Connection: ConnectionProtocol {
self.user = user
self.password = password
self.database = database
self.stream = try PacketStream(host: url.host ?? "localhost", port: UInt(url.port ?? 3306))
let tcp = try TCPStream(host: url.host ?? "localhost", port: UInt(url.port ?? 3306))
self.stream = PacketStream(stream: tcp)
try self.open()
}

Expand All @@ -66,14 +67,6 @@ public final class Connection: ConnectionProtocol {
_isClosed = false
}

func write(_ cmd: Commands, query: String) throws {
try stream.writePacket([cmd.rawValue] + query.utf8, packnr: -1)
}

func write(_ cmd: Commands) throws {
try stream.writePacket([cmd.rawValue], packnr: -1)
}

func reserve(){
cond.mutex.lock()
isUsed = true
Expand All @@ -86,31 +79,8 @@ public final class Connection: ConnectionProtocol {
cond.mutex.unlock()
}

func readHeaderPacket() throws -> (Int, OKPacket?) {
let (bytes, _) = try stream.readPacket()
if let okPacket = try OKPacket(bytes: bytes) {
return (0, okPacket)
} else {
let (_num, n) = lenEncInt(bytes)
if let num = _num, (n - bytes.count) == 0 {
return (Int(num), nil)
} else {
return (0, nil)
}
}
}

func readUntilEOF() throws {
while true {
let (bytes, _) = try stream.readPacket()
if bytes[0] == 0xfe {
break
}
}
}

public func close() throws {
try write(.quit)
try stream.write(.quit)
stream.close()
_isClosed = true
}
Expand Down
92 changes: 92 additions & 0 deletions Sources/SwiftMysql/Error.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//
// Error.swift
// SwiftMysqlPackageDescription
//
// Created by Yuki Takei on 2017/11/06.
//

import Foundation

func createErrorFrom(errorPacket bytes :[UInt8]) -> MysqlError {
if bytes[0] != 0xff {
return MysqlClientError.error(code: -1, message: "EOF encountered")
}

let errno = bytes[1...3].uInt16()
var pos = 3

if bytes[3] == 0x23 {
pos = 9
}
var d1 = Array(bytes[pos..<bytes.count])
d1.append(0)
let errStr = d1.string()

if errno > 2000 {
return MysqlClientError.error(code: Int16(errno), message: errStr ?? "Unknown")
}

return MysqlServerError.error(code: Int16(errno), message: errStr ?? "Unknown")
}

public protocol MysqlError: Error {
var code: Int16 { get }
var message: String { get }
}

public enum MysqlClientError: MysqlError {
case commandsOutOfSync
case error(code: Int16, message: String)
}

extension MysqlClientError {
public var code: Int16 {
switch self {
case .commandsOutOfSync:
return 2014
case .error(let code, _):
return code
}
}

public var message: String {
switch self {
case .commandsOutOfSync:
return "Commands out of sync; you can't run this command now"
case .error(_, let mes):
return mes
}
}
}

extension MysqlClientError: CustomStringConvertible {
public var description: String {
return "MYSQL CLIENT ERROR \(code): \(message)"
}
}

public enum MysqlServerError: MysqlError {
case error(code: Int16, message: String)
}

extension MysqlServerError {
public var code: Int16 {
switch self {
case .error(let code, _):
return code
}
}

public var message: String {
switch self {
case .error(_, let mes):
return mes
}
}
}

extension MysqlServerError: CustomStringConvertible {
public var description: String {
return "MYSQL SERVER ERROR \(code): \(message)"
}
}
45 changes: 24 additions & 21 deletions Sources/SwiftMysql/Protocol/Packet/BinaryRowDataPacket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class BinaryRowDataPacket: RowDataParsable {
self.columns = columns
}

func parse(bytes: [UInt8]) throws -> Row? {
func parse(bytes: [UInt8]) throws -> [Any?]? {
if columns.isEmpty {
return nil
}
Expand All @@ -33,7 +33,7 @@ class BinaryRowDataPacket: RowDataParsable {

var pos = 1 + (columns.count + 7 + 2)>>3
let nullBitmap = Array(bytes[1..<pos])
var row = Row()
var rows = [Any?]()

for index in 0..<columns.count {
let idx = (index+2)>>3
Expand All @@ -42,59 +42,60 @@ class BinaryRowDataPacket: RowDataParsable {
let column = columns[index]

if (val & 1) == 1 {
row[column.name] = NSNull()
rows.append(nil)
continue
}

let row: Any?
switch column.fieldType {
case .null:
row[column.name] = NSNull()
row = nil

case .tiny:
row[column.name] = column.flags.isUnsigned() ? UInt8(bytes[pos..<pos+1]) : Int8(bytes[pos..<pos+1])
row = column.flags.isUnsigned() ? UInt8(bytes[pos..<pos+1]) : Int8(bytes[pos..<pos+1])
pos += 1

case .short:
row[column.name] = column.flags.isUnsigned() ? UInt16(bytes[pos..<pos+2]) : Int16(bytes[pos..<pos+2])
row = column.flags.isUnsigned() ? UInt16(bytes[pos..<pos+2]) : Int16(bytes[pos..<pos+2])
pos += 2

case .int24, .long:
row[column.name] = column.flags.isUnsigned() ? UInt(UInt32(bytes[pos..<pos+4])) : Int(Int32(bytes[pos..<pos+4]))
row = column.flags.isUnsigned() ? UInt(UInt32(bytes[pos..<pos+4])) : Int(Int32(bytes[pos..<pos+4]))
pos += 4

case .longlong:
row[column.name] = column.flags.isUnsigned() ? UInt64(bytes[pos..<pos+8]) : Int64(bytes[pos..<pos+8])
row = column.flags.isUnsigned() ? UInt64(bytes[pos..<pos+8]) : Int64(bytes[pos..<pos+8])
pos += 8

case .float:
row[column.name] = bytes[pos..<pos+4].float32()
row = bytes[pos..<pos+4].float32()
pos += 4

case .double:
row[column.name] = bytes[pos..<pos+8].float64()
row = bytes[pos..<pos+8].float64()
pos += 8

case .blob, .mediumBlob, .varchar, .varString, .string, .longBlob:
if column.charSetNr == 63 {
let (bres, n) = lenEncBin(Array(bytes[pos..<bytes.count]))
row[column.name] = bres
row = bres
pos += n
}
else {
let (str, n) = lenEncStr(Array(bytes[pos..<bytes.count]))
row[column.name] = str
row = str
pos += n
}

case .decimal, .newdecimal, .bit, .`enum`, .set, .geometory, .json:
let (str, n) = lenEncStr(Array(bytes[pos..<bytes.count]))
row[column.name] = str
row = str
pos += n

case .date:
let (_dlen, n) = lenEncInt(Array(bytes[pos..<bytes.count]))
guard let dlen = _dlen else {
row[column.name] = NSNull()
row = nil
break
}

Expand All @@ -116,14 +117,14 @@ class BinaryRowDataPacket: RowDataParsable {
break
}

row[column.name] = res ?? NSNull()
row = res

pos += n + Int(dlen)

case .time:
let (_dlen, n) = lenEncInt(Array(bytes[pos..<bytes.count]))
guard let dlen = _dlen else {
row[column.name] = NSNull()
row = nil
break
}

Expand All @@ -147,15 +148,15 @@ class BinaryRowDataPacket: RowDataParsable {
break
}

row[column.name] = res ?? NSNull()
row = res

pos += n + Int(dlen)

case .timestamp, .datetime:
let (_dlen, n) = lenEncInt(Array(bytes[pos..<bytes.count]))

guard let dlen = _dlen else {
row[column.name] = NSNull()
row = nil
break
}

Expand All @@ -182,14 +183,16 @@ class BinaryRowDataPacket: RowDataParsable {
}

let dstr = String(format: "%4d-%02d-%02d %02d:%02d:%02d.%06d", arguments: [y, mo, d, h, m, s, u])
row[column.name] = Date(dateTimeStringUsec: dstr) ?? NSNull()
row = Date(dateTimeStringUsec: dstr)

pos += n + Int(dlen)
default:
row[column.name] = NSNull()
row = nil
}

rows.append(row)
}

return row
return rows
}
}

0 comments on commit 031a3eb

Please sign in to comment.